This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new dbbfc79c0 [lake/iceberg] Introduce IcebergRewriteDataFiles to compact 
files (#1552)
dbbfc79c0 is described below

commit dbbfc79c0a8402a7354878af933eaa99ecb90008
Author: MehulBatra <[email protected]>
AuthorDate: Wed Aug 27 11:41:01 2025 +0530

    [lake/iceberg] Introduce IcebergRewriteDataFiles to compact files (#1552)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 fluss-lake/fluss-lake-iceberg/pom.xml              |   1 -
 .../maintenance/IcebergRewriteDataFiles.java       | 182 ++++++++++++++
 .../iceberg/maintenance/RewriteDataFileResult.java |  56 +++++
 .../lake/iceberg/tiering/IcebergCommittable.java   |  32 ++-
 .../lake/iceberg/tiering/IcebergLakeCommitter.java | 102 ++++++--
 .../lake/iceberg/tiering/IcebergLakeWriter.java    | 133 +++++++---
 .../lake/iceberg/tiering/IcebergWriteResult.java   |  20 +-
 .../tiering/IcebergWriteResultSerializer.java      |  59 ++++-
 .../tiering/writer/AppendOnlyTaskWriter.java       |  29 +--
 .../iceberg/tiering/writer/DeltaTaskWriter.java    |  49 +---
 .../writer/GenericRecordAppendOnlyWriter.java      |  14 +-
 .../tiering/writer/GenericRecordDeltaWriter.java   |  12 +-
 .../iceberg/tiering/writer/TaskWriterFactory.java  | 111 +++++++++
 .../lake/iceberg/utils/IcebergConversions.java     |  27 +++
 .../apache/iceberg/data/IcebergGenericReader.java  |  33 +++
 .../iceberg/maintenance/IcebergRewriteTest.java    | 268 +++++++++++++++++++++
 .../tiering/IcebergWriteResultSerializerTest.java  | 109 +++++++++
 17 files changed, 1073 insertions(+), 164 deletions(-)

diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 5ad616def..f80cec66b 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -31,7 +31,6 @@
     <name>Fluss : Lake : Iceberg</name>
 
     <packaging>jar</packaging>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.iceberg</groupId>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
new file mode 100644
index 000000000..a74507655
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteDataFiles.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.maintenance;
+
+import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
+import org.apache.fluss.metadata.TableBucket;
+
+import org.apache.iceberg.BaseCombinedScanTask;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.util.BinPacking;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toFilterExpression;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * Concrete implementation for Fluss's Iceberg integration. Handles 
bin-packing compaction of small
+ * files into larger ones.
+ */
+public class IcebergRewriteDataFiles {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergRewriteDataFiles.class);
+
+    private static final int MIN_FILES_TO_COMPACT = 3;
+
+    private final Table table;
+    private final String partition;
+    private final TableBucket bucket;
+    private final Expression filter;
+    private long targetSizeInBytes = 128 * 1024 * 1024; // 128MB default
+
+    public IcebergRewriteDataFiles(Table table, @Nullable String partition, 
TableBucket bucket) {
+        this.table = table;
+        this.partition = partition;
+        this.bucket = bucket;
+        this.filter = toFilterExpression(table, partition, bucket.getBucket());
+    }
+
+    public IcebergRewriteDataFiles targetSizeInBytes(long targetSize) {
+        this.targetSizeInBytes = targetSize;
+        return this;
+    }
+
+    private List<CombinedScanTask> planRewriteFileGroups() throws IOException {
+        List<FileScanTask> fileScanTasks = new ArrayList<>();
+        try (CloseableIterable<FileScanTask> tasks =
+                
table.newScan().includeColumnStats().filter(filter).ignoreResiduals().planFiles())
 {
+            tasks.forEach(fileScanTasks::add);
+        }
+
+        // the files < targetSizeInBytes is less than MIN_FILES_TO_COMPACT, 
don't compact
+        if (fileScanTasks.stream()
+                        .filter(fileScanTask -> fileScanTask.length() < 
targetSizeInBytes)
+                        .count()
+                < MIN_FILES_TO_COMPACT) {
+            // return empty file group
+            return Collections.emptyList();
+        }
+
+        // then, pack the fileScanTasks into compaction units which contains 
compactable
+        // fileScanTasks, after compaction, we want to it still keep order by 
__offset column,
+        // so, let's first sort by __offset column
+        int offsetFieldId = 
table.schema().findField(OFFSET_COLUMN_NAME).fieldId();
+        fileScanTasks.sort(sortFileScanTask(offsetFieldId));
+
+        // do package now
+        BinPacking.ListPacker<FileScanTask> packer =
+                new BinPacking.ListPacker<>(targetSizeInBytes, 1, false);
+        return packer.pack(fileScanTasks, ContentScanTask::length).stream()
+                .filter(tasks -> tasks.size() > 1)
+                .map(BaseCombinedScanTask::new)
+                .collect(Collectors.toList());
+    }
+
+    private Comparator<FileScanTask> sortFileScanTask(int sortFiledId) {
+        return (f1, f2) -> {
+            ByteBuffer buffer1 =
+                    f1.file()
+                            .lowerBounds()
+                            .get(sortFiledId)
+                            .order(ByteOrder.LITTLE_ENDIAN)
+                            .rewind();
+            long offset1 = buffer1.getLong();
+            ByteBuffer buffer2 =
+                    f2.file()
+                            .lowerBounds()
+                            .get(sortFiledId)
+                            .order(ByteOrder.LITTLE_ENDIAN)
+                            .rewind();
+            long offset2 = buffer2.getLong();
+            return Long.compare(offset1, offset2);
+        };
+    }
+
+    @Nullable
+    public RewriteDataFileResult execute() {
+        try {
+            // plan the file groups to be rewrite
+            List<CombinedScanTask> tasksToRewrite = planRewriteFileGroups();
+            if (tasksToRewrite.isEmpty()) {
+                return null;
+            }
+            LOG.info("Start to rewrite files {}.", tasksToRewrite);
+            List<DataFile> deletedDataFiles = new ArrayList<>();
+            List<DataFile> addedDataFiles = new ArrayList<>();
+            for (CombinedScanTask combinedScanTask : tasksToRewrite) {
+                addedDataFiles.addAll(rewriteFileGroup(combinedScanTask));
+                deletedDataFiles.addAll(
+                        combinedScanTask.files().stream()
+                                .map(ContentScanTask::file)
+                                .collect(Collectors.toList()));
+            }
+            LOG.info("Finish rewriting files from {} to {}.", 
deletedDataFiles, addedDataFiles);
+            return new RewriteDataFileResult(deletedDataFiles, addedDataFiles);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format("Fail to compact bucket %s of table %s.", 
bucket, table.name()),
+                    e);
+        }
+    }
+
+    private List<DataFile> rewriteFileGroup(CombinedScanTask combinedScanTask) 
throws IOException {
+        try (CloseableIterable<Record> records = 
readDataFile(combinedScanTask);
+                TaskWriter<Record> taskWriter =
+                        TaskWriterFactory.createTaskWriter(table, partition, 
bucket.getBucket())) {
+            for (Record record : records) {
+                taskWriter.write(record);
+            }
+            WriteResult rewriteResult = taskWriter.complete();
+            checkState(
+                    rewriteResult.deleteFiles().length == 0,
+                    "the delete files should be empty, but got "
+                            + Arrays.toString(rewriteResult.deleteFiles()));
+            return Arrays.asList(rewriteResult.dataFiles());
+        }
+    }
+
+    private CloseableIterable<Record> readDataFile(CombinedScanTask 
combinedScanTask) {
+        IcebergGenericReader reader = new 
IcebergGenericReader(table.newScan(), true);
+        return reader.open(combinedScanTask);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
new file mode 100644
index 000000000..e24580cab
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/maintenance/RewriteDataFileResult.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.maintenance;
+
+import org.apache.iceberg.DataFile;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** The result for rewrite iceberg data files. */
+public class RewriteDataFileResult implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<DataFile> deletedDataFiles;
+    private final List<DataFile> addedDataFiles;
+
+    public RewriteDataFileResult(List<DataFile> deletedDataFiles, 
List<DataFile> addedDataFiles) {
+        this.deletedDataFiles = deletedDataFiles;
+        this.addedDataFiles = addedDataFiles;
+    }
+
+    public List<DataFile> deletedDataFiles() {
+        return deletedDataFiles;
+    }
+
+    public List<DataFile> addedDataFiles() {
+        return addedDataFiles;
+    }
+
+    @Override
+    public String toString() {
+        return "RewriteDataFileResult{"
+                + "deletedDataFiles="
+                + deletedDataFiles
+                + ", addedDataFiles="
+                + addedDataFiles
+                + '}';
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
index 1019eab45..f476f0ef3 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCommittable.java
@@ -17,6 +17,8 @@
 
 package org.apache.fluss.lake.iceberg.tiering;
 
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
+
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 
@@ -32,9 +34,15 @@ public class IcebergCommittable implements Serializable {
     private final List<DataFile> dataFiles;
     private final List<DeleteFile> deleteFiles;
 
-    private IcebergCommittable(List<DataFile> dataFiles, List<DeleteFile> 
deleteFiles) {
+    private final List<RewriteDataFileResult> rewriteDataFiles;
+
+    private IcebergCommittable(
+            List<DataFile> dataFiles,
+            List<DeleteFile> deleteFiles,
+            List<RewriteDataFileResult> rewriteDataFiles) {
         this.dataFiles = dataFiles;
         this.deleteFiles = deleteFiles;
+        this.rewriteDataFiles = rewriteDataFiles;
     }
 
     public List<DataFile> getDataFiles() {
@@ -45,6 +53,10 @@ public class IcebergCommittable implements Serializable {
         return deleteFiles;
     }
 
+    public List<RewriteDataFileResult> rewriteDataFileResults() {
+        return rewriteDataFiles;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -57,6 +69,8 @@ public class IcebergCommittable implements Serializable {
         private final List<DataFile> dataFiles = new ArrayList<>();
         private final List<DeleteFile> deleteFiles = new ArrayList<>();
 
+        private final List<RewriteDataFileResult> rewriteDataFileResults = new 
ArrayList<>();
+
         public Builder addDataFile(DataFile dataFile) {
             this.dataFiles.add(dataFile);
             return this;
@@ -67,8 +81,16 @@ public class IcebergCommittable implements Serializable {
             return this;
         }
 
+        public Builder addRewriteDataFileResult(RewriteDataFileResult 
rewriteDataFileResult) {
+            this.rewriteDataFileResults.add(rewriteDataFileResult);
+            return this;
+        }
+
         public IcebergCommittable build() {
-            return new IcebergCommittable(new ArrayList<>(dataFiles), new 
ArrayList<>(deleteFiles));
+            return new IcebergCommittable(
+                    new ArrayList<>(dataFiles),
+                    new ArrayList<>(deleteFiles),
+                    rewriteDataFileResults);
         }
     }
 
@@ -76,9 +98,11 @@ public class IcebergCommittable implements Serializable {
     public String toString() {
         return "IcebergCommittable{"
                 + "dataFiles="
-                + dataFiles.size()
+                + dataFiles
                 + ", deleteFiles="
-                + deleteFiles.size()
+                + deleteFiles
+                + "rewriteDataFiles = "
+                + rewriteDataFiles
                 + '}';
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index b87081ad5..604f6cc6b 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.iceberg.tiering;
 import org.apache.fluss.lake.committer.BucketOffset;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
 import org.apache.fluss.metadata.TablePath;
 import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -29,6 +30,7 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.RowDelta;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotUpdate;
@@ -39,6 +41,8 @@ import org.apache.iceberg.events.CreateSnapshotEvent;
 import org.apache.iceberg.events.Listener;
 import org.apache.iceberg.events.Listeners;
 import org.apache.iceberg.io.WriteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -56,6 +60,9 @@ import static 
org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /** Implementation of {@link LakeCommitter} for Iceberg. */
 public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult, 
IcebergCommittable> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergLakeCommitter.class);
+
     private static final String COMMITTER_USER = "commit-user";
 
     private final Catalog icebergCatalog;
@@ -87,6 +94,11 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
             for (DeleteFile deleteFile : writeResult.deleteFiles()) {
                 builder.addDeleteFile(deleteFile);
             }
+
+            RewriteDataFileResult rewriteDataFileResult = 
result.rewriteDataFileResult();
+            if (rewriteDataFileResult != null) {
+                builder.addRewriteDataFileResult(rewriteDataFileResult);
+            }
         }
 
         return builder.build();
@@ -99,52 +111,92 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
             // Refresh table to get latest metadata
             icebergTable.refresh();
 
+            SnapshotUpdate<?> snapshotUpdate;
             if (committable.getDeleteFiles().isEmpty()) {
                 // Simple append-only case: only data files, no delete files 
or compaction
                 AppendFiles appendFiles = icebergTable.newAppend();
                 for (DataFile dataFile : committable.getDataFiles()) {
                     appendFiles.appendFile(dataFile);
                 }
-
-                addFlussProperties(appendFiles, snapshotProperties);
-
-                appendFiles.commit();
+                snapshotUpdate = appendFiles;
             } else {
-                /**
-                 * Row delta validations are not needed for streaming changes 
that write equality
-                 * deletes. Equality deletes are applied to data in all 
previous sequence numbers,
-                 * so retries may push deletes further in the future, but do 
not affect correctness.
-                 * Position deletes committed to the table in this path are 
used only to delete rows
-                 * from data files that are being added in this commit. There 
is no way for data
-                 * files added along with the delete files to be concurrently 
removed, so there is
-                 * no need to validate the files referenced by the position 
delete files that are
-                 * being committed.
-                 */
+                /*
+                 Row delta validations are not needed for streaming changes 
that write equality
+                 deletes. Equality deletes are applied to data in all previous 
sequence numbers,
+                 so retries may push deletes further in the future, but do not 
affect correctness.
+                 Position deletes committed to the table in this path are used 
only to delete rows
+                 from data files that are being added in this commit. There is 
no way for data
+                 files added along with the delete files to be concurrently 
removed, so there is
+                 no need to validate the files referenced by the position 
delete files that are
+                 being committed.
+                */
                 RowDelta rowDelta = icebergTable.newRowDelta();
                 
Arrays.stream(committable.getDataFiles().stream().toArray(DataFile[]::new))
                         .forEach(rowDelta::addRows);
                 
Arrays.stream(committable.getDeleteFiles().stream().toArray(DeleteFile[]::new))
                         .forEach(rowDelta::addDeletes);
-                snapshotProperties.forEach(rowDelta::set);
-                rowDelta.commit();
+                snapshotUpdate = rowDelta;
             }
 
-            Long commitSnapshotId = currentCommitSnapshotId.get();
-            currentCommitSnapshotId.remove();
-
-            return checkNotNull(
-                    commitSnapshotId, "Iceberg committed snapshot id must be 
non-null.");
+            // commit written files
+            long snapshotId = commit(snapshotUpdate, snapshotProperties);
+
+            // There exists rewrite files, commit rewrite files
+            List<RewriteDataFileResult> rewriteDataFileResults =
+                    committable.rewriteDataFileResults();
+            if (!rewriteDataFileResults.isEmpty()) {
+                Long rewriteCommitSnapshotId =
+                        commitRewrite(rewriteDataFileResults, 
snapshotProperties);
+                if (rewriteCommitSnapshotId != null) {
+                    snapshotId = rewriteCommitSnapshotId;
+                }
+            }
+            return checkNotNull(snapshotId, "Iceberg committed snapshot id 
must be non-null.");
         } catch (Exception e) {
             throw new IOException("Failed to commit to Iceberg table.", e);
         }
     }
 
-    private void addFlussProperties(
-            SnapshotUpdate<?> operation, Map<String, String> 
snapshotProperties) {
-        operation.set(COMMITTER_USER, FLUSS_LAKE_TIERING_COMMIT_USER);
+    private Long commitRewrite(
+            List<RewriteDataFileResult> rewriteDataFileResults,
+            Map<String, String> snapshotProperties) {
+        icebergTable.refresh();
+        RewriteFiles rewriteFiles = icebergTable.newRewrite();
+        for (RewriteDataFileResult rewriteDataFileResult : 
rewriteDataFileResults) {
+            
rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
+            
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
+        }
+        try {
+            return commit(rewriteFiles, snapshotProperties);
+        } catch (Exception e) {
+            List<String> rewriteAddedDataFiles =
+                    rewriteDataFileResults.stream()
+                            .flatMap(
+                                    rewriteDataFileResult ->
+                                            
rewriteDataFileResult.addedDataFiles().stream())
+                            .map(dataFile -> dataFile.path().toString())
+                            .collect(Collectors.toList());
+            LOG.error(
+                    "Failed to commit rewrite files to iceberg, delete rewrite 
added files {}.",
+                    rewriteAddedDataFiles,
+                    e);
+            // we need to abort new rewrite files
+            CatalogUtil.deleteFiles(icebergTable.io(), rewriteAddedDataFiles, 
"data file", true);
+            return null;
+        }
+    }
+
+    private long commit(SnapshotUpdate<?> snapshotUpdate, Map<String, String> 
snapshotProperties) {
+        // add snapshot properties
+        snapshotUpdate.set(COMMITTER_USER, FLUSS_LAKE_TIERING_COMMIT_USER);
         for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
-            operation.set(entry.getKey(), entry.getValue());
+            snapshotUpdate.set(entry.getKey(), entry.getValue());
         }
+        // do commit
+        snapshotUpdate.commit();
+        Long commitSnapshotId = currentCommitSnapshotId.get();
+        currentCommitSnapshotId.remove();
+        return commitSnapshotId;
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
index c5765b7d1..da9d78f71 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -17,37 +17,54 @@
 
 package org.apache.fluss.lake.iceberg.tiering;
 
+import org.apache.fluss.lake.iceberg.maintenance.IcebergRewriteDataFiles;
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
 import org.apache.fluss.lake.iceberg.tiering.writer.AppendOnlyTaskWriter;
 import org.apache.fluss.lake.iceberg.tiering.writer.DeltaTaskWriter;
+import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
 import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
 
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
 
 /** Implementation of {@link LakeWriter} for Iceberg. */
 public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
 
+    protected static final Logger LOG = 
LoggerFactory.getLogger(IcebergLakeWriter.class);
+
+    private static final String AUTO_MAINTENANCE_KEY = 
"table.datalake.auto-maintenance";
+
     private final Catalog icebergCatalog;
     private final Table icebergTable;
     private final RecordWriter recordWriter;
+    private final boolean autoMaintenanceEnabled;
+
+    @Nullable private final ExecutorService compactionExecutor;
+    @Nullable private CompletableFuture<RewriteDataFileResult> 
compactionFuture;
 
     public IcebergLakeWriter(
             IcebergCatalogProvider icebergCatalogProvider, WriterInitContext 
writerInitContext)
@@ -55,33 +72,37 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
         this.icebergCatalog = icebergCatalogProvider.get();
         this.icebergTable = getTable(writerInitContext.tablePath());
 
-        // Create record writer based on table type
-        // For now, only supporting non-partitioned append-only tables
+        // Check auto-maintenance from table properties
+        this.autoMaintenanceEnabled =
+                Boolean.parseBoolean(
+                        
icebergTable.properties().getOrDefault(AUTO_MAINTENANCE_KEY, "false"));
+
+        // Create a record writer
         this.recordWriter = createRecordWriter(writerInitContext);
+
+        if (autoMaintenanceEnabled) {
+            this.compactionExecutor =
+                    Executors.newSingleThreadExecutor(
+                            new ExecutorThreadFactory(
+                                    "iceberg-compact-" + 
writerInitContext.tableBucket()));
+            scheduleCompaction(writerInitContext);
+        } else {
+            this.compactionExecutor = null;
+        }
     }
 
     private RecordWriter createRecordWriter(WriterInitContext 
writerInitContext) {
-        Schema schema = icebergTable.schema();
-        List<Integer> equalityFieldIds = new 
ArrayList<>(schema.identifierFieldIds());
-
-        // Get target file size from table properties
-        long targetFileSize = targetFileSize(icebergTable);
-        FileFormat format = fileFormat(icebergTable);
-        OutputFileFactory outputFileFactory =
-                OutputFileFactory.builderFor(
-                                icebergTable,
-                                writerInitContext.tableBucket().getBucket(),
-                                // task id always 0
-                                0)
-                        .format(format)
-                        .build();
-
+        List<Integer> equalityFieldIds =
+                new ArrayList<>(icebergTable.schema().identifierFieldIds());
+        TaskWriter<Record> taskWriter =
+                TaskWriterFactory.createTaskWriter(
+                        icebergTable,
+                        writerInitContext.partition(),
+                        writerInitContext.tableBucket().getBucket());
         if (equalityFieldIds.isEmpty()) {
-            return new AppendOnlyTaskWriter(
-                    icebergTable, writerInitContext, format, 
outputFileFactory, targetFileSize);
+            return new AppendOnlyTaskWriter(icebergTable, writerInitContext, 
taskWriter);
         } else {
-            return new DeltaTaskWriter(
-                    icebergTable, writerInitContext, format, 
outputFileFactory, targetFileSize);
+            return new DeltaTaskWriter(icebergTable, writerInitContext, 
taskWriter);
         }
     }
 
@@ -98,7 +119,12 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
     public IcebergWriteResult complete() throws IOException {
         try {
             WriteResult writeResult = recordWriter.complete();
-            return new IcebergWriteResult(writeResult);
+
+            RewriteDataFileResult rewriteDataFileResult = null;
+            if (compactionFuture != null) {
+                rewriteDataFileResult = compactionFuture.get();
+            }
+            return new IcebergWriteResult(writeResult, rewriteDataFileResult);
         } catch (Exception e) {
             throw new IOException("Failed to complete Iceberg write.", e);
         }
@@ -107,6 +133,15 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
     @Override
     public void close() throws IOException {
         try {
+            if (compactionFuture != null && !compactionFuture.isDone()) {
+                compactionFuture.cancel(true);
+            }
+
+            if (compactionExecutor != null
+                    && !compactionExecutor.awaitTermination(30, 
TimeUnit.SECONDS)) {
+                LOG.warn("Fail to close compactionExecutor.");
+            }
+
             if (recordWriter != null) {
                 recordWriter.close();
             }
@@ -126,19 +161,39 @@ public class IcebergLakeWriter implements 
LakeWriter<IcebergWriteResult> {
         }
     }
 
-    private static FileFormat fileFormat(Table icebergTable) {
-        String formatString =
-                PropertyUtil.propertyAsString(
-                        icebergTable.properties(),
-                        TableProperties.DEFAULT_FILE_FORMAT,
-                        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
-        return FileFormat.fromString(formatString);
+    private void scheduleCompaction(WriterInitContext writerInitContext) {
+        compactionFuture =
+                CompletableFuture.supplyAsync(
+                        () -> {
+                            try {
+                                Table table = icebergTable;
+                                IcebergRewriteDataFiles rewriter =
+                                        new IcebergRewriteDataFiles(
+                                                        table,
+                                                        
writerInitContext.partition(),
+                                                        
writerInitContext.tableBucket())
+                                                
.targetSizeInBytes(compactionTargetSize(table));
+                                return rewriter.execute();
+                            } catch (Exception e) {
+                                LOG.info("Fail to compact iceberg data 
files.", e);
+                                // Swallow and return null to avoid failing 
the write path
+                                return null;
+                            }
+                        },
+                        compactionExecutor);
     }
 
-    private static long targetFileSize(Table icebergTable) {
-        return PropertyUtil.propertyAsLong(
-                icebergTable.properties(),
-                WRITE_TARGET_FILE_SIZE_BYTES,
-                WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    private static long compactionTargetSize(Table icebergTable) {
+        long splitSize =
+                PropertyUtil.propertyAsLong(
+                        icebergTable.properties(),
+                        TableProperties.SPLIT_SIZE,
+                        TableProperties.SPLIT_SIZE_DEFAULT);
+        long targetFileSize =
+                PropertyUtil.propertyAsLong(
+                        icebergTable.properties(),
+                        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+                        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+        return Math.min(splitSize, targetFileSize);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java
index cd2825ce5..2b36c87c2 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResult.java
@@ -17,8 +17,12 @@
 
 package org.apache.fluss.lake.iceberg.tiering;
 
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
+
 import org.apache.iceberg.io.WriteResult;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 
 /** The write result of Iceberg lake writer to pass to commiter to commit. */
@@ -26,16 +30,27 @@ public class IcebergWriteResult implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    // the normal result of tiering writing to iceberg
     private final WriteResult writeResult;
 
-    public IcebergWriteResult(WriteResult writeResult) {
+    // the rewrite result
+    @Nullable private final RewriteDataFileResult rewriteDataFileResult;
+
+    public IcebergWriteResult(
+            WriteResult writeResult, @Nullable RewriteDataFileResult 
rewriteDataFileResult) {
         this.writeResult = writeResult;
+        this.rewriteDataFileResult = rewriteDataFileResult;
     }
 
     public WriteResult getWriteResult() {
         return writeResult;
     }
 
+    @Nullable
+    public RewriteDataFileResult rewriteDataFileResult() {
+        return rewriteDataFileResult;
+    }
+
     @Override
     public String toString() {
         return "IcebergWriteResult{"
@@ -43,6 +58,9 @@ public class IcebergWriteResult implements Serializable {
                 + writeResult.dataFiles().length
                 + ", deleteFiles="
                 + writeResult.deleteFiles().length
+                + (rewriteDataFileResult != null
+                        ? (", rewriteDataFiles=" + rewriteDataFileResult)
+                        : "")
                 + '}';
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
index 26bb02a77..fe48e3af2 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
@@ -18,11 +18,16 @@
 
 package org.apache.fluss.lake.iceberg.tiering;
 
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
 import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
 import org.apache.fluss.utils.InstantiationUtils;
 
 import org.apache.iceberg.io.WriteResult;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 
 /** Serializer for {@link IcebergWriteResult}. */
@@ -37,18 +42,64 @@ public class IcebergWriteResultSerializer implements 
SimpleVersionedSerializer<I
 
     @Override
     public byte[] serialize(IcebergWriteResult icebergWriteResult) throws 
IOException {
-        return 
InstantiationUtils.serializeObject(icebergWriteResult.getWriteResult());
+        byte[] writeResultBytes =
+                
InstantiationUtils.serializeObject(icebergWriteResult.getWriteResult());
+
+        RewriteDataFileResult rewriteDataFileResult = 
icebergWriteResult.rewriteDataFileResult();
+        byte[] rewriteResultBytes =
+                rewriteDataFileResult == null
+                        ? null
+                        : 
InstantiationUtils.serializeObject(rewriteDataFileResult);
+
+        try (ByteArrayOutputStream baos = new 
ByteArrayOutputStream(writeResultBytes.length);
+                DataOutputStream dos = new DataOutputStream(baos)) {
+            // Frame WriteResult
+            dos.writeInt(writeResultBytes.length);
+            dos.write(writeResultBytes);
+
+            // optional, rewrite
+            boolean hasRewrite = rewriteResultBytes != null;
+            dos.writeBoolean(hasRewrite);
+            if (hasRewrite) {
+                dos.writeInt(rewriteResultBytes.length);
+                dos.write(rewriteResultBytes);
+            }
+            return baos.toByteArray();
+        }
     }
 
     @Override
     public IcebergWriteResult deserialize(int version, byte[] serialized) 
throws IOException {
         WriteResult writeResult;
-        try {
+        RewriteDataFileResult rewriteDataFileResult;
+        try (DataInputStream dis = new DataInputStream(new 
ByteArrayInputStream(serialized))) {
+            int wrLen = dis.readInt();
+            if (wrLen < 0 || wrLen > serialized.length) {
+                throw new IOException(
+                        "Corrupted serialization: invalid WriteResult length " 
+ wrLen);
+            }
+            byte[] wrBytes = new byte[wrLen];
+            dis.readFully(wrBytes);
             writeResult =
-                    InstantiationUtils.deserializeObject(serialized, 
getClass().getClassLoader());
+                    InstantiationUtils.deserializeObject(wrBytes, 
getClass().getClassLoader());
+
+            boolean hasCompaction = dis.readBoolean();
+            if (hasCompaction) {
+                int crLen = dis.readInt();
+                if (crLen < 0 || crLen > serialized.length) {
+                    throw new IOException(
+                            "Corrupted serialization: invalid compactionResult 
length " + crLen);
+                }
+                byte[] crBytes = new byte[crLen];
+                dis.readFully(crBytes);
+                rewriteDataFileResult =
+                        InstantiationUtils.deserializeObject(crBytes, 
getClass().getClassLoader());
+            } else {
+                rewriteDataFileResult = null;
+            }
         } catch (ClassNotFoundException e) {
             throw new IOException(e);
         }
-        return new IcebergWriteResult(writeResult);
+        return new IcebergWriteResult(writeResult, rewriteDataFileResult);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
index 6f204ca30..53ccb3a80 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java
@@ -21,12 +21,8 @@ import org.apache.fluss.lake.iceberg.tiering.RecordWriter;
 import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.record.LogRecord;
 
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileAppenderFactory;
-import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 
 /** A {@link RecordWriter} to write to Iceberg's append-only table. */
@@ -35,35 +31,14 @@ public class AppendOnlyTaskWriter extends RecordWriter {
     public AppendOnlyTaskWriter(
             Table icebergTable,
             WriterInitContext writerInitContext,
-            FileFormat format,
-            OutputFileFactory outputFileFactory,
-            long targetFileSize) {
+            TaskWriter<Record> taskWriter) {
         super(
-                createTaskWriter(
-                        icebergTable, writerInitContext, format, 
outputFileFactory, targetFileSize),
+                taskWriter,
                 icebergTable.schema(),
                 writerInitContext.schema().getRowType(),
                 writerInitContext.tableBucket());
     }
 
-    private static TaskWriter<Record> createTaskWriter(
-            Table icebergTable,
-            WriterInitContext writerInitContext,
-            FileFormat format,
-            OutputFileFactory outputFileFactory,
-            long targetFileSize) {
-        FileAppenderFactory<Record> fileAppenderFactory =
-                new GenericAppenderFactory(icebergTable.schema(), 
icebergTable.spec());
-        return new GenericRecordAppendOnlyWriter(
-                icebergTable,
-                format,
-                fileAppenderFactory,
-                outputFileFactory,
-                icebergTable.io(),
-                targetFileSize,
-                writerInitContext);
-    }
-
     @Override
     public void write(LogRecord record) throws Exception {
         flussRecordAsIcebergRecord.setFlussRecord(record);
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
index 677ae1e67..cbab8fdf9 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java
@@ -21,69 +21,24 @@ import org.apache.fluss.lake.iceberg.tiering.RecordWriter;
 import org.apache.fluss.lake.writer.WriterInitContext;
 import org.apache.fluss.record.LogRecord;
 
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.data.GenericAppenderFactory;
 import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileAppenderFactory;
-import org.apache.iceberg.io.OutputFileFactory;
 import org.apache.iceberg.io.TaskWriter;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /** A {@link RecordWriter} to write to Iceberg's primary-key table. */
 public class DeltaTaskWriter extends RecordWriter {
 
     public DeltaTaskWriter(
             Table icebergTable,
             WriterInitContext writerInitContext,
-            FileFormat format,
-            OutputFileFactory outputFileFactory,
-            long targetFileSize) {
+            TaskWriter<Record> taskWriter) {
         super(
-                createTaskWriter(
-                        icebergTable, format, outputFileFactory, 
targetFileSize, writerInitContext),
+                taskWriter,
                 icebergTable.schema(),
                 writerInitContext.schema().getRowType(),
                 writerInitContext.tableBucket());
     }
 
-    private static TaskWriter<Record> createTaskWriter(
-            Table icebergTable,
-            FileFormat format,
-            OutputFileFactory outputFileFactory,
-            long targetFileSize,
-            WriterInitContext writerInitContext) {
-        int[] equalityFieldIds =
-                icebergTable.schema().identifierFieldIds().stream()
-                        .mapToInt(Integer::intValue)
-                        .toArray();
-        FileAppenderFactory<Record> appenderFactory =
-                new GenericAppenderFactory(
-                        icebergTable.schema(),
-                        icebergTable.spec(),
-                        equalityFieldIds,
-                        icebergTable.schema(),
-                        null);
-
-        List<String> columns = new ArrayList<>();
-        for (Integer fieldId : icebergTable.schema().identifierFieldIds()) {
-            columns.add(icebergTable.schema().findField(fieldId).name());
-        }
-        Schema deleteSchema = icebergTable.schema().select(columns);
-        return new GenericRecordDeltaWriter(
-                icebergTable,
-                deleteSchema,
-                format,
-                appenderFactory,
-                outputFileFactory,
-                icebergTable.io(),
-                targetFileSize,
-                writerInitContext);
-    }
-
     @Override
     public void write(LogRecord record) throws Exception {
         GenericRecordDeltaWriter deltaWriter = (GenericRecordDeltaWriter) 
taskWriter;
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
index 8d5130304..0933ccb26 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordAppendOnlyWriter.java
@@ -18,8 +18,6 @@
 
 package org.apache.fluss.lake.iceberg.tiering.writer;
 
-import org.apache.fluss.lake.writer.WriterInitContext;
-
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.data.Record;
@@ -28,6 +26,8 @@ import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 import static 
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
@@ -44,14 +44,10 @@ public class GenericRecordAppendOnlyWriter extends 
BaseTaskWriter<Record> {
             OutputFileFactory fileFactory,
             FileIO io,
             long targetFileSize,
-            WriterInitContext writerInitContext) {
+            @Nullable String partitionName,
+            int bucket) {
         super(icebergTable.spec(), format, appenderFactory, fileFactory, io, 
targetFileSize);
-        currentWriter =
-                new RollingFileWriter(
-                        toPartition(
-                                icebergTable,
-                                writerInitContext.partition(),
-                                writerInitContext.tableBucket().getBucket()));
+        currentWriter = new RollingFileWriter(toPartition(icebergTable, 
partitionName, bucket));
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
index d327ef176..22abafb3a 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/GenericRecordDeltaWriter.java
@@ -17,8 +17,6 @@
 
 package org.apache.fluss.lake.iceberg.tiering.writer;
 
-import org.apache.fluss.lake.writer.WriterInitContext;
-
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.Schema;
@@ -30,6 +28,8 @@ import org.apache.iceberg.io.FileAppenderFactory;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.OutputFileFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 import static 
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toPartition;
@@ -46,14 +46,12 @@ class GenericRecordDeltaWriter extends 
BaseTaskWriter<Record> {
             OutputFileFactory fileFactory,
             FileIO io,
             long targetFileSize,
-            WriterInitContext writerInitContext) {
+            @Nullable String partition,
+            int bucket) {
         super(icebergTable.spec(), format, appenderFactory, fileFactory, io, 
targetFileSize);
         this.deltaWriter =
                 new GenericEqualityDeltaWriter(
-                        toPartition(
-                                icebergTable,
-                                writerInitContext.partition(),
-                                writerInitContext.tableBucket().getBucket()),
+                        toPartition(icebergTable, partition, bucket),
                         icebergTable.schema(),
                         deleteSchema);
     }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/TaskWriterFactory.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/TaskWriterFactory.java
new file mode 100644
index 000000000..2e5cf1223
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/TaskWriterFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.tiering.writer;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.util.PropertyUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/** A factory to create Iceberg {@link TaskWriter}. */
+public class TaskWriterFactory {
+
+    public static TaskWriter<Record> createTaskWriter(
+            Table table, @Nullable String partition, int bucket) {
+        Schema schema = table.schema();
+        int[] equalityFieldIds =
+                
schema.identifierFieldIds().stream().mapToInt(Integer::intValue).toArray();
+
+        // Get target file size from table properties
+        long targetFileSize = targetFileSize(table);
+        FileFormat format = fileFormat(table);
+        OutputFileFactory outputFileFactory =
+                OutputFileFactory.builderFor(
+                                table,
+                                bucket,
+                                // task id always 0
+                                0)
+                        .format(format)
+                        .build();
+
+        if (equalityFieldIds.length == 0) {
+            FileAppenderFactory<Record> fileAppenderFactory =
+                    new GenericAppenderFactory(schema, table.spec());
+            return new GenericRecordAppendOnlyWriter(
+                    table,
+                    format,
+                    fileAppenderFactory,
+                    outputFileFactory,
+                    table.io(),
+                    targetFileSize,
+                    partition,
+                    bucket);
+
+        } else {
+            FileAppenderFactory<Record> appenderFactory =
+                    new GenericAppenderFactory(
+                            schema, table.spec(), equalityFieldIds, schema, 
null);
+            List<String> columns = new ArrayList<>();
+            for (Integer fieldId : schema.identifierFieldIds()) {
+                columns.add(schema.findField(fieldId).name());
+            }
+            Schema deleteSchema = schema.select(columns);
+            return new GenericRecordDeltaWriter(
+                    table,
+                    deleteSchema,
+                    format,
+                    appenderFactory,
+                    outputFileFactory,
+                    table.io(),
+                    targetFileSize,
+                    partition,
+                    bucket);
+        }
+    }
+
+    private static FileFormat fileFormat(Table icebergTable) {
+        String formatString =
+                PropertyUtil.propertyAsString(
+                        icebergTable.properties(),
+                        TableProperties.DEFAULT_FILE_FORMAT,
+                        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+        return FileFormat.fromString(formatString);
+    }
+
+    private static long targetFileSize(Table icebergTable) {
+        return PropertyUtil.propertyAsLong(
+                icebergTable.properties(),
+                WRITE_TARGET_FILE_SIZE_BYTES,
+                WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
index 2b53fe7a3..af20ff594 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -19,14 +19,19 @@ package org.apache.fluss.lake.iceberg.utils;
 
 import org.apache.fluss.metadata.TablePath;
 
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+
 import static 
org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
 
 /** Utility class for static conversions between Fluss and Iceberg types. */
@@ -52,4 +57,26 @@ public class IcebergConversions {
         partitionKey.set(pos, bucket);
         return partitionKey;
     }
+
+    public static Expression toFilterExpression(
+            Table table, @Nullable String partitionName, int bucket) {
+        List<PartitionField> partitionFields = table.spec().fields();
+        Expression expression = Expressions.alwaysTrue();
+        int partitionIndex = 0;
+        if (partitionName != null) {
+            String[] partitionArr = partitionName.split("\\" + 
PARTITION_SPEC_SEPARATOR);
+            for (String partition : partitionArr) {
+                expression =
+                        Expressions.and(
+                                expression,
+                                Expressions.equal(
+                                        
partitionFields.get(partitionIndex++).name(), partition));
+            }
+        }
+        expression =
+                Expressions.and(
+                        expression,
+                        
Expressions.equal(partitionFields.get(partitionIndex).name(), bucket));
+        return expression;
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/iceberg/data/IcebergGenericReader.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/iceberg/data/IcebergGenericReader.java
new file mode 100644
index 000000000..c496c6fd8
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/iceberg/data/IcebergGenericReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.data;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.TableScan;
+
+/**
+ * GenericReader to read for records for iceberg. Extends from Iceberg {@link 
GenericReader} to
+ * enable the {@link #open(FileScanTask)} method to be visible to Fluss.
+ */
+public class IcebergGenericReader extends GenericReader {
+
+    public IcebergGenericReader(TableScan scan, boolean reuseContainers) {
+        super(scan, reuseContainers);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
new file mode 100644
index 000000000..264588bee
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.maintenance;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.tiering.IcebergCatalogProvider;
+import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteFiles;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.fluss.utils.Preconditions.checkState;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test to verify compaction via {@link IcebergRewriteDataFiles}. */
+class IcebergRewriteTest {
+
+    private @TempDir File tempWarehouseDir;
+    private Catalog icebergCatalog;
+
+    @BeforeEach
+    void setUp() {
+        Configuration configuration = new Configuration();
+        configuration.setString("warehouse", "file://" + tempWarehouseDir);
+        configuration.setString("type", "hadoop");
+        configuration.setString("name", "test");
+        IcebergCatalogProvider provider = new 
IcebergCatalogProvider(configuration);
+        icebergCatalog = provider.get();
+    }
+
+    @Test
+    void testSingleBucketRewrite() throws Exception {
+        TablePath tablePath = TablePath.of("iceberg", "compact_table");
+        createTable(tablePath);
+        Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
+        int bucket = 0;
+        appendTinyFilesWithRowsAndBucket(icebergTable, 2, 3, 1000, bucket);
+        icebergTable.refresh();
+
+        int filesBefore = countDataFiles(icebergTable);
+        // We expect exactly 2 file for this no-op compaction scenario
+        // since rewrite only happen when small files >= 3
+        assertThat(filesBefore).isEqualTo(2);
+
+        IcebergRewriteDataFiles icebergRewriteDataFiles =
+                createIcebergRewriteDataFiles(icebergTable, bucket);
+
+        // do rewrite, rewrite result should be null since no rewrite should 
happen
+        RewriteDataFileResult rewriteDataFileResult = 
icebergRewriteDataFiles.execute();
+        assertThat(rewriteDataFileResult).isNull();
+
+        // append some files again, now, should rewrite
+        appendTinyFilesWithRowsAndBucket(icebergTable, 2, 3, 2000, bucket);
+
+        long rowsBefore = countRows(icebergTable);
+
+        icebergRewriteDataFiles = createIcebergRewriteDataFiles(icebergTable, 
bucket);
+        rewriteDataFileResult = icebergRewriteDataFiles.execute();
+
+        // verify the rewrite result
+        assertThat(rewriteDataFileResult).isNotNull();
+        assertThat(rewriteDataFileResult.deletedDataFiles()).hasSize(4);
+        assertThat(rewriteDataFileResult.addedDataFiles()).hasSize(1);
+
+        // commit
+        commitRewrite(icebergTable, rewriteDataFileResult);
+        icebergTable.refresh();
+        // only one file now
+        assertThat(countDataFiles(icebergTable)).isEqualTo(1);
+
+        // try compact again, should do nothing
+        icebergRewriteDataFiles = createIcebergRewriteDataFiles(icebergTable, 
bucket);
+        rewriteDataFileResult = icebergRewriteDataFiles.execute();
+        assertThat(rewriteDataFileResult).isNull();
+
+        // compact shouldn't change row counts
+        long rowsAfter = countRows(icebergTable);
+        assertThat(rowsAfter).isEqualTo(rowsBefore);
+    }
+
+    @Test
+    void testMultipleBucketRewrite() throws Exception {
+        TablePath tablePath = TablePath.of("iceberg", "rewrite_bucket_scoped");
+        createTable(tablePath);
+
+        Table table = icebergCatalog.loadTable(toIceberg(tablePath));
+        // Seed bucket 0: 3 tiny files, bucket 1: 3 tiny files
+        appendTinyFilesWithRowsAndBucket(table, 3, 1, 4000, 0);
+        appendTinyFilesWithRowsAndBucket(table, 3, 1, 5000, 1);
+        table.refresh();
+
+        int filesBeforeBucket0 = countFilesForBucket(table, 0);
+        int filesBeforeBucket1 = countFilesForBucket(table, 1);
+        assertThat(filesBeforeBucket0).isEqualTo(3);
+        assertThat(filesBeforeBucket1).isEqualTo(3);
+
+        //  rewrite only bucket 0
+        RewriteDataFileResult rewriteDataFileResult =
+                createIcebergRewriteDataFiles(table, 0).execute();
+        assertThat(rewriteDataFileResult).isNotNull();
+        commitRewrite(table, rewriteDataFileResult);
+        table.refresh();
+
+        int filesAfterBucket0 = countFilesForBucket(table, 0);
+        int filesAfterBucket1 = countFilesForBucket(table, 1);
+        assertThat(filesAfterBucket0).isEqualTo(1);
+        assertThat(filesAfterBucket1).isEqualTo(3);
+
+        //  rewrite only bucket 1
+        RewriteDataFileResult res1 = createIcebergRewriteDataFiles(table, 
1).execute();
+        assertThat(res1).isNotNull();
+        commitRewrite(table, res1);
+        table.refresh();
+        filesAfterBucket1 = countFilesForBucket(table, 1);
+        assertThat(filesAfterBucket1).isEqualTo(1);
+    }
+
+    private IcebergRewriteDataFiles createIcebergRewriteDataFiles(Table table, 
int bucket) {
+        table.refresh();
+        return new IcebergRewriteDataFiles(table, null, new TableBucket(0, 
bucket));
+    }
+
+    private void commitRewrite(Table table, RewriteDataFileResult 
rewriteDataFileResult) {
+        table.refresh();
+        RewriteFiles rewriteFiles = table.newRewrite();
+        
rewriteDataFileResult.deletedDataFiles().forEach(rewriteFiles::deleteFile);
+        rewriteDataFileResult.addedDataFiles().forEach(rewriteFiles::addFile);
+        rewriteFiles.commit();
+    }
+
+    private void createTable(TablePath tablePath) {
+        Namespace namespace = Namespace.of(tablePath.getDatabaseName());
+        SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
+        if (!ns.namespaceExists(namespace)) {
+            ns.createNamespace(namespace);
+        }
+
+        Schema schema =
+                new Schema(
+                        Types.NestedField.optional(1, "c1", 
Types.IntegerType.get()),
+                        Types.NestedField.optional(2, "c2", 
Types.StringType.get()),
+                        Types.NestedField.optional(3, "c3", 
Types.StringType.get()),
+                        Types.NestedField.required(4, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                        Types.NestedField.required(5, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
+                        Types.NestedField.required(
+                                6, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone()));
+
+        PartitionSpec partitionSpec =
+                
PartitionSpec.builderFor(schema).identity(BUCKET_COLUMN_NAME).build();
+        TableIdentifier tableId =
+                TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
+        icebergCatalog.createTable(tableId, schema, partitionSpec);
+    }
+
+    private static int countDataFiles(Table table) throws IOException {
+        int count = 0;
+        try (CloseableIterable<FileScanTask> tasks = 
table.newScan().planFiles()) {
+            for (FileScanTask ignored : tasks) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private static long countRows(Table table) {
+        long cnt = 0L;
+        try (CloseableIterable<Record> it = 
IcebergGenerics.read(table).build()) {
+            for (Record ignored : it) {
+                cnt++;
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return cnt;
+    }
+
+    private static int countFilesForBucket(Table table, int bucket) throws 
IOException {
+        int count = 0;
+        try (CloseableIterable<FileScanTask> tasks =
+                table.newScan().filter(Expressions.equal(BUCKET_COLUMN_NAME, 
bucket)).planFiles()) {
+            for (FileScanTask ignored : tasks) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private static void appendTinyFilesWithRowsAndBucket(
+            Table table, int files, int rowsPerFile, int baseOffset, int 
bucket) throws Exception {
+        List<DataFile> toAppend = new ArrayList<>(files);
+        for (int i = 0; i < files; i++) {
+            toAppend.add(
+                    writeTinyDataFile(table, rowsPerFile, baseOffset + (i * 
rowsPerFile), bucket));
+        }
+        AppendFiles append = table.newAppend();
+        for (DataFile f : toAppend) {
+            append.appendFile(f);
+        }
+        append.commit();
+    }
+
+    private static DataFile writeTinyDataFile(Table table, int rows, int 
startOffset, int bucket)
+            throws Exception {
+        try (TaskWriter<Record> taskWriter =
+                TaskWriterFactory.createTaskWriter(table, null, bucket)) {
+            for (int i = 0; i < rows; i++) {
+                Record r = 
org.apache.iceberg.data.GenericRecord.create(table.schema());
+                r.setField("c1", i);
+                r.setField("c2", "v_" + i);
+                r.setField("c3", "g");
+                r.setField(BUCKET_COLUMN_NAME, bucket);
+                r.setField(OFFSET_COLUMN_NAME, (long) (startOffset + i));
+                r.setField(
+                        TIMESTAMP_COLUMN_NAME,
+                        
java.time.OffsetDateTime.now(java.time.ZoneOffset.UTC));
+                taskWriter.write(r);
+            }
+            DataFile[] dataFiles = taskWriter.dataFiles();
+            checkState(dataFiles.length == 1);
+            return dataFiles[0];
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
new file mode 100644
index 000000000..207714305
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergWriteResultSerializerTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.tiering;
+
+import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** UT for {@link IcebergWriteResultSerializer}. */
+class IcebergWriteResultSerializerTest {
+
+    private IcebergWriteResultSerializer serializer;
+    private Schema schema;
+    private PartitionSpec spec;
+    private GenericRecord partitionData;
+
+    @BeforeEach
+    void setUp() {
+        serializer = new IcebergWriteResultSerializer();
+        schema =
+                new Schema(
+                        Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                        Types.NestedField.optional(2, "category", 
Types.StringType.get()));
+        spec = PartitionSpec.builderFor(schema).identity("category").build();
+        partitionData = GenericRecord.create(spec.partitionType());
+        partitionData.setField("category", "A");
+    }
+
+    @Test
+    void testSerializeAndDeserializeWithAllFiles() throws IOException {
+        // 1. Arrange: Create a complex write result
+        DataFile dataFile = createDataFile("/data/file1.parquet", 100L);
+        DeleteFile deleteFile = createDeleteFile("/data/deletes1.parquet", 
10L);
+
+        // no rewrite result
+        WriteResult writeResult =
+                
WriteResult.builder().addDataFiles(dataFile).addDeleteFiles(deleteFile).build();
+
+        IcebergWriteResult originalResult = new 
IcebergWriteResult(writeResult, null);
+
+        // 2. Act: Serialize and then deserialize the object
+        byte[] serializedData = serializer.serialize(originalResult);
+        IcebergWriteResult deserializedResult =
+                serializer.deserialize(serializer.getVersion(), 
serializedData);
+        
assertThat(deserializedResult.toString()).isEqualTo(originalResult.toString());
+
+        // with rewrite result
+        RewriteDataFileResult rewriteDataFileResult =
+                new RewriteDataFileResult(
+                        Collections.singletonList(dataFile), 
Collections.singletonList(dataFile));
+        originalResult = new IcebergWriteResult(writeResult, 
rewriteDataFileResult);
+        serializedData = serializer.serialize(originalResult);
+        deserializedResult = serializer.deserialize(serializer.getVersion(), 
serializedData);
+        
assertThat(deserializedResult.toString()).isEqualTo(originalResult.toString());
+    }
+
+    private DeleteFile createDeleteFile(String path, long recordCount) {
+        return FileMetadata.deleteFileBuilder(spec)
+                .withPath(path)
+                .withFileSizeInBytes(1024)
+                .withPartition(partitionData)
+                .withRecordCount(recordCount)
+                .withFormat(FileFormat.PARQUET)
+                .ofPositionDeletes()
+                .build();
+    }
+
+    private DataFile createDataFile(String path, long recordCount) {
+        return DataFiles.builder(spec)
+                .withPath(path)
+                .withFileSizeInBytes(4096)
+                .withPartition(partitionData)
+                .withRecordCount(recordCount)
+                .withFormat(FileFormat.PARQUET)
+                .build();
+    }
+}

Reply via email to