This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a936637a [lake/iceberg] implement lake writer for iceberg pk table 
(#1555)
8a936637a is described below

commit 8a936637a4b023fa827a73ee6474d894b2f6ebe0
Author: xx789 <[email protected]>
AuthorDate: Tue Aug 19 11:56:10 2025 +0800

    [lake/iceberg] implement lake writer for iceberg pk table (#1555)
---
 .../lake/iceberg/tiering/IcebergLakeCommitter.java |  68 +++++++---
 .../lake/iceberg/tiering/IcebergLakeWriter.java    |  85 ++++++++++--
 .../{append => writer}/AppendOnlyWriter.java       |  47 ++-----
 .../iceberg/tiering/writer/DeltaTaskWriter.java    | 112 ++++++++++++++++
 .../tiering/writer/GenericTaskDeltaWriter.java     |  80 ++++++++++++
 .../fluss/lake/iceberg/IcebergTieringTest.java     | 145 +++++++++++++++++----
 6 files changed, 443 insertions(+), 94 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 6353fb06c..d3009ee0e 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -28,7 +28,10 @@ import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotUpdate;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -40,6 +43,7 @@ import org.apache.iceberg.io.WriteResult;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -77,6 +81,10 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
             for (DataFile dataFile : writeResult.dataFiles()) {
                 builder.addDataFile(dataFile);
             }
+            // Add delete files
+            for (DeleteFile deleteFile : writeResult.deleteFiles()) {
+                builder.addDeleteFile(deleteFile);
+            }
         }
 
         return builder.build();
@@ -88,22 +96,36 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
         try {
             // Refresh table to get latest metadata
             icebergTable.refresh();
-            // Simple append-only case: only data files, no delete files or 
compaction
-            AppendFiles appendFiles = icebergTable.newAppend();
-            for (DataFile dataFile : committable.getDataFiles()) {
-                appendFiles.appendFile(dataFile);
-            }
-            if (!committable.getDeleteFiles().isEmpty()) {
-                throw new IllegalStateException(
-                        "Delete files are not supported in append-only mode. "
-                                + "Found "
-                                + committable.getDeleteFiles().size()
-                                + " delete files.");
-            }
 
-            addFlussProperties(appendFiles, snapshotProperties);
+            if (committable.getDeleteFiles().isEmpty()) {
+                // Simple append-only case: only data files, no delete files 
or compaction
+                AppendFiles appendFiles = icebergTable.newAppend();
+                for (DataFile dataFile : committable.getDataFiles()) {
+                    appendFiles.appendFile(dataFile);
+                }
 
-            appendFiles.commit();
+                addFlussProperties(appendFiles, snapshotProperties);
+
+                appendFiles.commit();
+            } else {
+                /**
+                 * Row delta validations are not needed for streaming changes 
that write equality
+                 * deletes. Equality deletes are applied to data in all 
previous sequence numbers,
+                 * so retries may push deletes further in the future, but do 
not affect correctness.
+                 * Position deletes committed to the table in this path are 
used only to delete rows
+                 * from data files that are being added in this commit. There 
is no way for data
+                 * files added along with the delete files to be concurrently 
removed, so there is
+                 * no need to validate the files referenced by the position 
delete files that are
+                 * being committed.
+                 */
+                RowDelta rowDelta = icebergTable.newRowDelta();
+                
Arrays.stream(committable.getDataFiles().stream().toArray(DataFile[]::new))
+                        .forEach(rowDelta::addRows);
+                
Arrays.stream(committable.getDeleteFiles().stream().toArray(DeleteFile[]::new))
+                        .forEach(rowDelta::addDeletes);
+                snapshotProperties.forEach(rowDelta::set);
+                rowDelta.commit();
+            }
 
             Long commitSnapshotId = currentCommitSnapshotId.get();
             currentCommitSnapshotId.remove();
@@ -116,20 +138,26 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
     }
 
     private void addFlussProperties(
-            AppendFiles appendFiles, Map<String, String> snapshotProperties) {
-        appendFiles.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+            SnapshotUpdate<?> operation, Map<String, String> 
snapshotProperties) {
+        operation.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
         for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
-            appendFiles.set(entry.getKey(), entry.getValue());
+            operation.set(entry.getKey(), entry.getValue());
         }
     }
 
     @Override
     public void abort(IcebergCommittable committable) {
-        List<String> filesToDelete =
+        List<String> dataFilesToDelete =
                 committable.getDataFiles().stream()
-                        .map(dataFile -> dataFile.path().toString())
+                        .map(file -> file.path().toString())
+                        .collect(Collectors.toList());
+        CatalogUtil.deleteFiles(icebergTable.io(), dataFilesToDelete, "data 
file", true);
+
+        List<String> deleteFilesToDelete =
+                committable.getDeleteFiles().stream()
+                        .map(file -> file.path().toString())
                         .collect(Collectors.toList());
-        CatalogUtil.deleteFiles(icebergTable.io(), filesToDelete, "data file", 
true);
+        CatalogUtil.deleteFiles(icebergTable.io(), deleteFilesToDelete, 
"delete file", true);
     }
 
     @Nullable
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
index e75378941..dd8e55e37 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -17,21 +17,32 @@
 
 package com.alibaba.fluss.lake.iceberg.tiering;
 
-import com.alibaba.fluss.lake.iceberg.tiering.append.AppendOnlyWriter;
+import com.alibaba.fluss.lake.iceberg.tiering.writer.AppendOnlyWriter;
+import com.alibaba.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
 import com.alibaba.fluss.lake.writer.LakeWriter;
 import com.alibaba.fluss.lake.writer.WriterInitContext;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.record.LogRecord;
 
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.util.PropertyUtil;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 
 /** Implementation of {@link LakeWriter} for Iceberg. */
 public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
@@ -52,19 +63,51 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
     }
 
     private RecordWriter createRecordWriter(WriterInitContext 
writerInitContext) {
-        if (!icebergTable.spec().isUnpartitioned()) {
-            throw new UnsupportedOperationException("Partitioned tables are 
not yet supported");
+        Schema schema = icebergTable.schema();
+        List<Integer> equalityFieldIds = new 
ArrayList<>(schema.identifierFieldIds());
+        PartitionSpec spec = icebergTable.spec();
+
+        // Get target file size from table properties
+        long targetFileSize = targetFileSize(icebergTable);
+        FileFormat format = fileFormat(icebergTable);
+        OutputFileFactory outputFileFactory =
+                OutputFileFactory.builderFor(
+                                icebergTable,
+                                writerInitContext.tableBucket().getBucket(),
+                                // task id always 0
+                                0)
+                        .format(format)
+                        .build();
+
+        if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
+            if (spec.isUnpartitioned()) {
+                return new AppendOnlyWriter(
+                        icebergTable,
+                        writerInitContext.schema().getRowType(),
+                        writerInitContext.tableBucket(),
+                        null, // No partition for non-partitioned table
+                        Collections.emptyList(), // No partition keys
+                        format,
+                        outputFileFactory,
+                        targetFileSize);
+            } else {
+                return null;
+            }
+        } else {
+            if (spec.isUnpartitioned()) {
+                return new DeltaTaskWriter(
+                        icebergTable,
+                        writerInitContext.schema().getRowType(),
+                        writerInitContext.tableBucket(),
+                        null, // No partition for non-partitioned table
+                        Collections.emptyList(), // No partition keys);
+                        format,
+                        outputFileFactory,
+                        targetFileSize);
+            } else {
+                return null;
+            }
         }
-
-        // For now, assume append-only (no primary keys)
-
-        return new AppendOnlyWriter(
-                icebergTable,
-                writerInitContext.schema().getRowType(),
-                writerInitContext.tableBucket(),
-                null, // No partition for non-partitioned table
-                Collections.emptyList() // No partition keys
-                );
     }
 
     @Override
@@ -107,4 +150,20 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
             throw new IOException("Failed to get table " + tablePath + " in 
Iceberg.", e);
         }
     }
+
+    private static FileFormat fileFormat(Table icebergTable) {
+        String formatString =
+                PropertyUtil.propertyAsString(
+                        icebergTable.properties(),
+                        TableProperties.DEFAULT_FILE_FORMAT,
+                        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+        return FileFormat.fromString(formatString);
+    }
+
+    private static long targetFileSize(Table icebergTable) {
+        return PropertyUtil.propertyAsLong(
+                icebergTable.properties(),
+                WRITE_TARGET_FILE_SIZE_BYTES,
+                WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
similarity index 60%
rename from 
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
rename to 
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
index 9c5607477..0b8ace18c 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/AppendOnlyWriter.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.lake.iceberg.tiering.append;
+package com.alibaba.fluss.lake.iceberg.tiering.writer;
 
 import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
 import com.alibaba.fluss.metadata.TableBucket;
@@ -24,22 +24,17 @@ import com.alibaba.fluss.types.RowType;
 
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
-import org.apache.iceberg.util.PropertyUtil;
 
 import javax.annotation.Nullable;
 
 import java.util.List;
 
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-
 /** A {@link RecordWriter} to write to Iceberg's append-only table. */
 public class AppendOnlyWriter extends RecordWriter {
 
@@ -48,9 +43,12 @@ public class AppendOnlyWriter extends RecordWriter {
             RowType flussRowType,
             TableBucket tableBucket,
             @Nullable String partition,
-            List<String> partitionKeys) {
+            List<String> partitionKeys,
+            FileFormat format,
+            OutputFileFactory outputFileFactory,
+            long targetFileSize) {
         super(
-                createTaskWriter(icebergTable, tableBucket),
+                createTaskWriter(icebergTable, format, outputFileFactory, 
targetFileSize),
                 icebergTable.schema(),
                 flussRowType,
                 tableBucket,
@@ -59,21 +57,12 @@ public class AppendOnlyWriter extends RecordWriter {
     }
 
     private static TaskWriter<Record> createTaskWriter(
-            Table icebergTable, TableBucket tableBucket) {
-        // Get target file size from table properties
-        long targetFileSize = targetFileSize(icebergTable);
-
+            Table icebergTable,
+            FileFormat format,
+            OutputFileFactory outputFileFactory,
+            long targetFileSize) {
         FileAppenderFactory<Record> fileAppenderFactory =
                 new GenericAppenderFactory(icebergTable.schema());
-        FileFormat format = fileFormat(icebergTable);
-        OutputFileFactory outputFileFactory =
-                OutputFileFactory.builderFor(
-                                icebergTable,
-                                tableBucket.getBucket(),
-                                // task id always 0
-                                0)
-                        .format(format)
-                        .build();
 
         return new UnpartitionedWriter<>(
                 icebergTable.spec(),
@@ -89,20 +78,4 @@ public class AppendOnlyWriter extends RecordWriter {
         flussRecordAsIcebergRecord.setFlussRecord(record);
         taskWriter.write(flussRecordAsIcebergRecord);
     }
-
-    private static FileFormat fileFormat(Table icebergTable) {
-        String formatString =
-                PropertyUtil.propertyAsString(
-                        icebergTable.properties(),
-                        TableProperties.DEFAULT_FILE_FORMAT,
-                        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
-        return FileFormat.fromString(formatString);
-    }
-
-    private static long targetFileSize(Table icebergTable) {
-        return PropertyUtil.propertyAsLong(
-                icebergTable.properties(),
-                WRITE_TARGET_FILE_SIZE_BYTES,
-                WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
-    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
new file mode 100644
index 000000000..4f16a5ce5
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering.writer;
+
+import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A {@link RecordWriter} to write to Iceberg's primary-key table. */
+public class DeltaTaskWriter extends RecordWriter {
+
+    public DeltaTaskWriter(
+            Table icebergTable,
+            RowType flussRowType,
+            TableBucket tableBucket,
+            @Nullable String partition,
+            List<String> partitionKeys,
+            FileFormat format,
+            OutputFileFactory outputFileFactory,
+            long targetFileSize) {
+        super(
+                createTaskWriter(icebergTable, format, outputFileFactory, 
targetFileSize),
+                icebergTable.schema(),
+                flussRowType,
+                tableBucket,
+                partition,
+                partitionKeys);
+    }
+
+    private static TaskWriter<Record> createTaskWriter(
+            Table icebergTable,
+            FileFormat format,
+            OutputFileFactory outputFileFactory,
+            long targetFileSize) {
+        int[] equalityFieldIds =
+                icebergTable.schema().identifierFieldIds().stream()
+                        .mapToInt(Integer::intValue)
+                        .toArray();
+        FileAppenderFactory<Record> appenderFactory =
+                new GenericAppenderFactory(
+                        icebergTable.schema(),
+                        icebergTable.spec(),
+                        equalityFieldIds,
+                        icebergTable.schema(),
+                        null);
+
+        List<String> columns = new ArrayList<>();
+        for (Integer fieldId : icebergTable.schema().identifierFieldIds()) {
+            columns.add(icebergTable.schema().findField(fieldId).name());
+        }
+        Schema deleteSchema = icebergTable.schema().select(columns);
+        return new GenericTaskDeltaWriter(
+                icebergTable.schema(),
+                deleteSchema,
+                icebergTable.spec(),
+                format,
+                appenderFactory,
+                outputFileFactory,
+                icebergTable.io(),
+                targetFileSize);
+    }
+
+    @Override
+    public void write(LogRecord record) throws Exception {
+        GenericTaskDeltaWriter deltaWriter = (GenericTaskDeltaWriter) 
taskWriter;
+        flussRecordAsIcebergRecord.setFlussRecord(record);
+        switch (record.getChangeType()) {
+            case INSERT:
+            case UPDATE_AFTER:
+                deltaWriter.write(flussRecordAsIcebergRecord);
+                break;
+            case UPDATE_BEFORE:
+            case DELETE:
+                // TODO we can project the record and only write the equality 
delete fields
+                deltaWriter.delete(flussRecordAsIcebergRecord);
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        "Unknown row kind: " + record.getChangeType());
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
new file mode 100644
index 000000000..65ab644dd
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/writer/GenericTaskDeltaWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.BaseTaskWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+
+import java.io.IOException;
+
+/** A generic task equality delta writer. * */
+class GenericTaskDeltaWriter extends BaseTaskWriter<Record> {
+    private final GenericEqualityDeltaWriter deltaWriter;
+
+    public GenericTaskDeltaWriter(
+            Schema schema,
+            Schema deleteSchema,
+            PartitionSpec spec,
+            FileFormat format,
+            FileAppenderFactory<Record> appenderFactory,
+            OutputFileFactory fileFactory,
+            FileIO io,
+            long targetFileSize) {
+        super(spec, format, appenderFactory, fileFactory, io, targetFileSize);
+        this.deltaWriter = new GenericEqualityDeltaWriter(null, schema, 
deleteSchema);
+    }
+
+    @Override
+    public void write(Record row) throws IOException {
+        deltaWriter.write(row);
+    }
+
+    public void delete(Record row) throws IOException {
+        deltaWriter.delete(row);
+    }
+
+    @Override
+    public void close() throws IOException {
+        deltaWriter.close();
+    }
+
+    private class GenericEqualityDeltaWriter extends BaseEqualityDeltaWriter {
+        private GenericEqualityDeltaWriter(
+                PartitionKey partition, Schema schema, Schema eqDeleteSchema) {
+            super(partition, schema, eqDeleteSchema);
+        }
+
+        @Override
+        protected StructLike asStructLike(Record record) {
+            return record;
+        }
+
+        @Override
+        protected StructLike asStructLikeKey(Record record) {
+            return record;
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
index e8455f0df..7e15f41e1 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
@@ -34,6 +34,7 @@ import com.alibaba.fluss.record.LogRecord;
 import com.alibaba.fluss.row.BinaryString;
 import com.alibaba.fluss.row.GenericRow;
 import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
 
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
@@ -47,8 +48,10 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.types.Types;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import javax.annotation.Nullable;
 
@@ -57,15 +60,23 @@ import java.io.IOException;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Stream;
 
 import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
 import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static com.alibaba.fluss.record.ChangeType.DELETE;
+import static com.alibaba.fluss.record.ChangeType.INSERT;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit test for tiering to Iceberg via {@link IcebergLakeTieringFactory}. */
@@ -87,14 +98,23 @@ class IcebergTieringTest {
         icebergLakeTieringFactory = new 
IcebergLakeTieringFactory(configuration);
     }
 
-    @Test
-    void testTieringWriteTable() throws Exception {
-        TablePath tablePath = TablePath.of("iceberg", "test_table");
-        createTable(tablePath);
-
-        Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
+    private static Stream<Arguments> tieringWriteArgs() {
+        return Stream.of(Arguments.of(true), Arguments.of(false));
+    }
 
+    @ParameterizedTest
+    @MethodSource("tieringWriteArgs")
+    void testTieringWriteTable(boolean isPrimaryKeyTable) throws Exception {
         int bucketNum = 3;
+        TablePath tablePath =
+                TablePath.of(
+                        "iceberg",
+                        String.format(
+                                "test_tiering_table_%s",
+                                isPrimaryKeyTable ? "primary_key" : "log"));
+        createTable(tablePath, isPrimaryKeyTable);
+
+        Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
 
         Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
 
@@ -107,11 +127,17 @@ class IcebergTieringTest {
         // first, write data
         for (int bucket = 0; bucket < bucketNum; bucket++) {
             try (LakeWriter<IcebergWriteResult> writer = 
createLakeWriter(tablePath, bucket)) {
-                List<LogRecord> records = genLogTableRecords(bucket, 5);
-                for (LogRecord record : records) {
+                Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords 
=
+                        isPrimaryKeyTable
+                                ? genPrimaryKeyTableRecords(bucket)
+                                : genLogTableRecords(bucket, 10);
+
+                List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
+                List<LogRecord> expectRecords = writeAndExpectRecords.f1;
+                recordsByBucket.put(bucket, expectRecords);
+                for (LogRecord record : writtenRecords) {
                     writer.write(record);
                 }
-                recordsByBucket.put(bucket, records);
                 IcebergWriteResult result = writer.complete();
                 byte[] serialized = writeResultSerializer.serialize(result);
                 icebergWriteResults.add(
@@ -142,7 +168,11 @@ class IcebergTieringTest {
         for (int bucket = 0; bucket < 3; bucket++) {
             List<LogRecord> expectRecords = recordsByBucket.get(bucket);
             CloseableIterator<Record> actualRecords = 
getIcebergRows(icebergTable, bucket);
-            verifyLogTableRecords(actualRecords, bucket, expectRecords);
+            if (isPrimaryKeyTable) {
+                verifyTableRecords(actualRecords, expectRecords, bucket);
+            } else {
+                verifyTableRecords(actualRecords, expectRecords, bucket);
+            }
         }
     }
 
@@ -187,7 +217,8 @@ class IcebergTieringTest {
         return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath);
     }
 
-    private List<LogRecord> genLogTableRecords(int bucket, int numRecords) {
+    private Tuple2<List<LogRecord>, List<LogRecord>> genLogTableRecords(
+            int bucket, int numRecords) {
         List<LogRecord> logRecords = new ArrayList<>();
         for (int i = 0; i < numRecords; i++) {
             GenericRow genericRow = new GenericRow(3);
@@ -200,10 +231,67 @@ class IcebergTieringTest {
                             i, System.currentTimeMillis(), 
ChangeType.APPEND_ONLY, genericRow);
             logRecords.add(logRecord);
         }
-        return logRecords;
+        return Tuple2.of(logRecords, logRecords);
     }
 
-    private void createTable(TablePath tablePath) throws Exception {
+    private Tuple2<List<LogRecord>, List<LogRecord>> 
genPrimaryKeyTableRecords(int bucket) {
+        int offset = -1;
+        // gen +I, -U, +U, -D
+        List<GenericRow> rows = genKvRow(bucket, 0, 0, 4);
+        List<LogRecord> writtenLogRecords =
+                new ArrayList<>(
+                        Arrays.asList(
+                                toRecord(++offset, rows.get(0), INSERT),
+                                toRecord(++offset, rows.get(1), UPDATE_BEFORE),
+                                toRecord(++offset, rows.get(2), UPDATE_AFTER),
+                                toRecord(++offset, rows.get(3), DELETE)));
+        List<LogRecord> expectLogRecords = new ArrayList<>();
+
+        // gen +I, -U, +U
+        rows = genKvRow(bucket, 1, 4, 7);
+        writtenLogRecords.addAll(
+                Arrays.asList(
+                        toRecord(++offset, rows.get(0), INSERT),
+                        toRecord(++offset, rows.get(1), UPDATE_BEFORE),
+                        toRecord(++offset, rows.get(2), UPDATE_AFTER)));
+        expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 
1));
+
+        // gen +I, +U
+        rows = genKvRow(bucket, 2, 7, 9);
+        writtenLogRecords.addAll(
+                Arrays.asList(
+                        toRecord(++offset, rows.get(0), INSERT),
+                        toRecord(++offset, rows.get(1), UPDATE_AFTER)));
+        expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 
1));
+
+        // gen +I
+        rows = genKvRow(bucket, 3, 9, 10);
+        writtenLogRecords.add(toRecord(++offset, rows.get(0), INSERT));
+        expectLogRecords.add(writtenLogRecords.get(writtenLogRecords.size() - 
1));
+
+        return Tuple2.of(writtenLogRecords, expectLogRecords);
+    }
+
+    private List<GenericRow> genKvRow(int bucket, int key, int from, int to) {
+        List<GenericRow> rows = new ArrayList<>();
+        for (int i = from; i < to; i++) {
+            GenericRow genericRow;
+            // Non-partitioned table
+            genericRow = new GenericRow(3);
+            genericRow.setField(0, key);
+            genericRow.setField(1, BinaryString.fromString("bucket" + bucket + 
"_" + i));
+            genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
+
+            rows.add(genericRow);
+        }
+        return rows;
+    }
+
+    private GenericRecord toRecord(long offset, GenericRow row, ChangeType 
changeType) {
+        return new GenericRecord(offset, System.currentTimeMillis(), 
changeType, row);
+    }
+
+    private void createTable(TablePath tablePath, boolean isPrimaryTable) 
throws Exception {
         Namespace namespace = Namespace.of(tablePath.getDatabaseName());
         if (icebergCatalog instanceof SupportsNamespaces) {
             SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
@@ -212,15 +300,24 @@ class IcebergTieringTest {
             }
         }
 
+        Set<Integer> identifierFieldIds = new HashSet<>();
+        if (isPrimaryTable) {
+            identifierFieldIds.add(1);
+        }
+
         org.apache.iceberg.Schema schema =
                 new org.apache.iceberg.Schema(
-                        Types.NestedField.optional(1, "c1", 
Types.IntegerType.get()),
-                        Types.NestedField.optional(2, "c2", 
Types.StringType.get()),
-                        Types.NestedField.optional(3, "c3", 
Types.StringType.get()),
-                        Types.NestedField.required(4, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
-                        Types.NestedField.required(5, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
-                        Types.NestedField.required(
-                                6, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone()));
+                        Arrays.asList(
+                                Types.NestedField.required(1, "c1", 
Types.IntegerType.get()),
+                                Types.NestedField.optional(2, "c2", 
Types.StringType.get()),
+                                Types.NestedField.optional(3, "c3", 
Types.StringType.get()),
+                                Types.NestedField.required(
+                                        4, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                                Types.NestedField.required(
+                                        5, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
+                                Types.NestedField.required(
+                                        6, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone())),
+                        identifierFieldIds);
 
         TableIdentifier tableId =
                 TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
@@ -234,10 +331,10 @@ class IcebergTieringTest {
                 .iterator();
     }
 
-    private void verifyLogTableRecords(
+    private void verifyTableRecords(
             CloseableIterator<Record> actualRecords,
-            int expectBucket,
-            List<LogRecord> expectRecords) {
+            List<LogRecord> expectRecords,
+            int expectBucket) {
         for (LogRecord expectRecord : expectRecords) {
             Record actualRecord = actualRecords.next();
             // check business columns:


Reply via email to