This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d4dfb45e6 [Iceberg/lake] Iceberg supports append-only non-partitioned 
table (#1524)
d4dfb45e6 is described below

commit d4dfb45e6af7116b07cf07ee1e34dd06f6e7f680
Author: MehulBatra <[email protected]>
AuthorDate: Thu Aug 14 16:44:43 2025 +0530

    [Iceberg/lake] Iceberg supports append-only non-partitioned table (#1524)
---
 .../fluss/lake/writer/LakeTieringFactory.java      |   2 +
 .../fluss/lake/lakestorage/LakeStorageTest.java    |  13 +-
 .../source/enumerator/TieringSourceEnumerator.java |   2 +-
 fluss-lake/fluss-lake-iceberg/pom.xml              |  76 ++++++
 .../iceberg/FlussDataTypeToIcebergDataType.java    |  13 +-
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |   7 +-
 .../fluss/lake/iceberg/IcebergLakeStorage.java     |   3 +-
 .../tiering/FlussRecordAsIcebergRecord.java        | 209 +++++++++++++++++
 .../iceberg/tiering/IcebergCatalogProvider.java    |  50 ++++
 .../lake/iceberg/tiering/IcebergCommittable.java   |  84 +++++++
 .../tiering/IcebergCommittableSerializer.java      |  52 ++++
 .../lake/iceberg/tiering/IcebergLakeCommitter.java | 235 +++++++++++++++++++
 .../tiering/IcebergLakeTieringFactory.java}        |  32 ++-
 .../lake/iceberg/tiering/IcebergLakeWriter.java    | 110 +++++++++
 .../lake/iceberg/tiering/IcebergWriteResult.java   |  48 ++++
 .../tiering/IcebergWriteResultSerializer.java      |  54 +++++
 .../fluss/lake/iceberg/tiering/RecordWriter.java   |  71 ++++++
 .../iceberg/tiering/append/AppendOnlyWriter.java   | 108 +++++++++
 .../lake/iceberg/utils/IcebergConversions.java     |  31 +++
 .../fluss/lake/iceberg/IcebergLakeCatalogTest.java |  13 +-
 .../fluss/lake/iceberg/IcebergTieringTest.java     | 261 +++++++++++++++++++++
 ...libaba.fluss.lake.lakestorage.LakeStoragePlugin |  14 +-
 .../lake/paimon/tiering/PaimonLakeCommitter.java   |   2 -
 .../paimon/tiering/PaimonLakeTieringFactory.java   |   2 -
 .../replica/fetcher/RemoteLeaderEndpointTest.java  |  13 +-
 website/docs/quickstart/security.md                |  12 +-
 26 files changed, 1457 insertions(+), 60 deletions(-)

diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
index 61417c9aa..bd7b18701 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
@@ -37,6 +37,8 @@ import java.io.Serializable;
 @PublicEvolving
 public interface LakeTieringFactory<WriteResult, CommittableT> extends 
Serializable {
 
+    String FLUSS_LAKE_TIERING_COMMIT_USER = "__fluss_lake_tiering";
+
     /**
      * Creates a lake writer to write Fluss's rows to Paimon/Iceberg rows.
      *
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
index 9c8ae73d4..304f05e17 100644
--- 
a/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
@@ -1,11 +1,12 @@
 /*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 296f1580f..ae10d3e29 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -203,7 +203,7 @@ public class TieringSourceEnumerator
             Long tieringEpoch = tieringTableEpochs.remove(failedTableId);
             LOG.info(
                     "Tiering table {} is failed, fail reason is {}.",
-                    tieringEpoch,
+                    failedTableId,
                     failedEvent.failReason());
             if (tieringEpoch == null) {
                 // shouldn't happen, warn it
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 01918d3aa..52f37d072 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -38,11 +38,87 @@
             <artifactId>iceberg-core</artifactId>
             <version>${iceberg.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-data</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-parquet</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.alibaba.fluss</groupId>
             <artifactId>fluss-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-flink-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>2.8.5</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${fluss.hadoop.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>avro</artifactId>
+                    <groupId>org.apache.avro</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j</artifactId>
+                    <groupId>log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>slf4j-log4j12</artifactId>
+                    <groupId>org.slf4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <groupId>ch.qos.reload4j</groupId>
+                    <artifactId>reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>jdk.tools</artifactId>
+                    <groupId>jdk.tools</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>protobuf-java</artifactId>
+                    <groupId>com.google.protobuf</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>commons-io</artifactId>
+                    <groupId>commons-io</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Flink test dependency -->
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-flink-${flink.major.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
index 2f265acc4..bfd47b05b 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -1,11 +1,12 @@
 /*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
index 5c0e06199..dbc087c1f 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -85,8 +85,11 @@ public class IcebergLakeCatalog implements LakeCatalog {
         String catalogName = icebergProps.getOrDefault("name", 
"fluss-iceberg-catalog");
 
         return buildIcebergCatalog(
-                catalogName, icebergProps, null // Optional: pass Hadoop 
configuration if available
-                );
+                catalogName,
+                icebergProps, // todo: current is an empty configuration, need 
to init from env or
+                // fluss
+                // configurations
+                new org.apache.hadoop.conf.Configuration());
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
index 9a553a3dd..9e8daf365 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
@@ -18,6 +18,7 @@
 package com.alibaba.fluss.lake.iceberg;
 
 import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
 import com.alibaba.fluss.lake.lakestorage.LakeStorage;
 import com.alibaba.fluss.lake.source.LakeSource;
 import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -34,7 +35,7 @@ public class IcebergLakeStorage implements LakeStorage {
 
     @Override
     public LakeTieringFactory<?, ?> createLakeTieringFactory() {
-        throw new UnsupportedOperationException("Not implemented");
+        return new IcebergLakeTieringFactory(icebergConfig);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
new file mode 100644
index 000000000..f72a40a67
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DateType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimeType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.DateTimeUtils;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Map;
+
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/**
+ * Wrap Fluss {@link LogRecord} as Iceberg {@link Record}.
+ *
+ * <p>todo: refactor to implement ParquetWriters, OrcWriters, AvroWriters just 
like Flink & Spark
+ * write to iceberg for higher performance
+ */
+public class FlussRecordAsIcebergRecord implements Record {
+
+    // Lake table for iceberg will append three system columns: __bucket, 
__offset,__timestamp
+    private static final int LAKE_ICEBERG_SYSTEM_COLUMNS = 3;
+
+    private LogRecord logRecord;
+    private final int bucket;
+    private final Schema icebergSchema;
+    private final RowType flussRowType;
+
+    // the origin row fields in fluss, excluding the system columns in iceberg
+    private int originRowFieldCount;
+    private InternalRow internalRow;
+
+    public FlussRecordAsIcebergRecord(int bucket, Schema icebergSchema, 
RowType flussRowType) {
+        this.bucket = bucket;
+        this.icebergSchema = icebergSchema;
+        this.flussRowType = flussRowType;
+    }
+
+    public void setFlussRecord(LogRecord logRecord) {
+        this.logRecord = logRecord;
+        this.internalRow = logRecord.getRow();
+        this.originRowFieldCount = internalRow.getFieldCount();
+        checkState(
+                originRowFieldCount
+                        == icebergSchema.asStruct().fields().size() - 
LAKE_ICEBERG_SYSTEM_COLUMNS,
+                "The Iceberg table fields count must equals to LogRecord's 
fields count.");
+    }
+
+    @Override
+    public Types.StructType struct() {
+        return icebergSchema.asStruct();
+    }
+
+    @Override
+    public Object getField(String name) {
+        return icebergSchema;
+    }
+
+    @Override
+    public void setField(String name, Object value) {
+        throw new UnsupportedOperationException("method setField is not 
supported.");
+    }
+
+    @Override
+    public Object get(int pos) {
+        // firstly, for system columns
+        if (pos == originRowFieldCount) {
+            // bucket column
+            return bucket;
+        } else if (pos == originRowFieldCount + 1) {
+            // log offset column
+            return logRecord.logOffset();
+        } else if (pos == originRowFieldCount + 2) {
+            // timestamp column
+            return getTimestampLtz(logRecord.timestamp());
+        }
+
+        // handle normal columns
+        if (internalRow.isNullAt(pos)) {
+            return null;
+        }
+
+        DataType dataType = flussRowType.getTypeAt(pos);
+        if (dataType instanceof BooleanType) {
+            return internalRow.getBoolean(pos);
+        } else if (dataType instanceof TinyIntType) {
+            return (int) internalRow.getByte(pos);
+        } else if (dataType instanceof SmallIntType) {
+            return internalRow.getShort(pos);
+        } else if (dataType instanceof IntType) {
+            return internalRow.getInt(pos);
+        } else if (dataType instanceof BigIntType) {
+            return internalRow.getLong(pos);
+        } else if (dataType instanceof FloatType) {
+            return internalRow.getFloat(pos);
+        } else if (dataType instanceof DoubleType) {
+            return internalRow.getDouble(pos);
+        } else if (dataType instanceof StringType) {
+            return internalRow.getString(pos).toString();
+        } else if (dataType instanceof BinaryType) {
+            // Iceberg's Record interface expects ByteBuffer for binary types.
+            return ByteBuffer.wrap(internalRow.getBytes(pos));
+        } else if (dataType instanceof DecimalType) {
+            // Iceberg expects BigDecimal for decimal types.
+            DecimalType decimalType = (DecimalType) dataType;
+            return internalRow
+                    .getDecimal(pos, decimalType.getPrecision(), 
decimalType.getScale())
+                    .toBigDecimal();
+        } else if (dataType instanceof LocalZonedTimestampType) {
+            // Iceberg expects OffsetDateTime for timestamp with local 
timezone.
+            return getTimestampLtz(
+                    internalRow
+                            .getTimestampLtz(
+                                    pos, ((LocalZonedTimestampType) 
dataType).getPrecision())
+                            .toInstant());
+        } else if (dataType instanceof TimestampType) {
+            // Iceberg expects LocalDateType for timestamp without local 
timezone.
+            return internalRow
+                    .getTimestampNtz(pos, ((TimestampType) 
dataType).getPrecision())
+                    .toLocalDateTime();
+        } else if (dataType instanceof DateType) {
+            return DateTimeUtils.toLocalDate(internalRow.getInt(pos));
+        } else if (dataType instanceof TimeType) {
+            return DateTimeUtils.toLocalTime(internalRow.getInt(pos));
+        }
+        throw new UnsupportedOperationException(
+                "Unsupported data type conversion for Fluss type: "
+                        + dataType.getClass().getName());
+    }
+
+    private OffsetDateTime getTimestampLtz(long timestamp) {
+        return OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), 
ZoneOffset.UTC);
+    }
+
+    private OffsetDateTime getTimestampLtz(Instant instant) {
+        return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
+    }
+
+    @Override
+    public Record copy() {
+        throw new UnsupportedOperationException("method copy is not 
supported.");
+    }
+
+    @Override
+    public Record copy(Map<String, Object> overwriteValues) {
+        throw new UnsupportedOperationException("method copy is not 
supported.");
+    }
+
+    @Override
+    public int size() {
+        return icebergSchema.asStruct().fields().size();
+    }
+
+    @Override
+    public <T> T get(int pos, Class<T> javaClass) {
+        Object value = get(pos);
+        if (value == null || javaClass.isInstance(value)) {
+            return javaClass.cast(value);
+        } else {
+            throw new IllegalStateException(
+                    "Not an instance of " + javaClass.getName() + ": " + 
value);
+        }
+    }
+
+    @Override
+    public <T> void set(int pos, T value) {
+        throw new UnsupportedOperationException("method set is not 
supported.");
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
new file mode 100644
index 000000000..1c484743a
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+
+import org.apache.iceberg.catalog.Catalog;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
+
+/** A provider for Iceberg catalog. */
+public class IcebergCatalogProvider implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private final Configuration icebergConfig;
+
+    public IcebergCatalogProvider(Configuration icebergConfig) {
+        this.icebergConfig = icebergConfig;
+    }
+
+    public Catalog get() {
+        Map<String, String> icebergProps = icebergConfig.toMap();
+        String catalogName = icebergProps.getOrDefault("name", 
"fluss-iceberg-catalog");
+
+        return buildIcebergCatalog(
+                catalogName,
+                icebergProps,
+                // todo: current is an empty configuration, need to init from 
env or fluss
+                // configurations
+                new org.apache.hadoop.conf.Configuration());
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittable.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittable.java
new file mode 100644
index 000000000..938ebc477
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittable.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** The committable that derived from {@link IcebergWriteResult} to commit to 
Iceberg. */
+public class IcebergCommittable implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<DataFile> dataFiles;
+    private final List<DeleteFile> deleteFiles;
+
+    private IcebergCommittable(List<DataFile> dataFiles, List<DeleteFile> 
deleteFiles) {
+        this.dataFiles = dataFiles;
+        this.deleteFiles = deleteFiles;
+    }
+
+    public List<DataFile> getDataFiles() {
+        return dataFiles;
+    }
+
+    public List<DeleteFile> getDeleteFiles() {
+        return deleteFiles;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    /**
+     * Builder for {@link IcebergCommittable}, collecting {@link DataFile} and 
{@link DeleteFile}
+     * entries.
+     */
+    public static class Builder {
+        private final List<DataFile> dataFiles = new ArrayList<>();
+        private final List<DeleteFile> deleteFiles = new ArrayList<>();
+
+        public Builder addDataFile(DataFile dataFile) {
+            this.dataFiles.add(dataFile);
+            return this;
+        }
+
+        public Builder addDeleteFile(DeleteFile deleteFile) {
+            this.deleteFiles.add(deleteFile);
+            return this;
+        }
+
+        public IcebergCommittable build() {
+            return new IcebergCommittable(new ArrayList<>(dataFiles), new 
ArrayList<>(deleteFiles));
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "IcebergCommittable{"
+                + "dataFiles="
+                + dataFiles.size()
+                + ", deleteFiles="
+                + deleteFiles.size()
+                + '}';
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittableSerializer.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittableSerializer.java
new file mode 100644
index 000000000..0ec247a74
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittableSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.utils.InstantiationUtils;
+
+import java.io.IOException;
+
+/** Serializer for {@link IcebergCommittable}. */
+public class IcebergCommittableSerializer implements 
SimpleVersionedSerializer<IcebergCommittable> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(IcebergCommittable committable) throws IOException 
{
+        return InstantiationUtils.serializeObject(committable);
+    }
+
+    @Override
+    public IcebergCommittable deserialize(int version, byte[] serialized) 
throws IOException {
+        IcebergCommittable icebergCommittable;
+        try {
+            icebergCommittable =
+                    InstantiationUtils.deserializeObject(serialized, 
getClass().getClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+        return icebergCommittable;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
new file mode 100644
index 000000000..6353fb06c
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.events.CreateSnapshotEvent;
+import org.apache.iceberg.events.Listener;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.WriteResult;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static 
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** Implementation of {@link LakeCommitter} for Iceberg. */
+public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult, 
IcebergCommittable> {
+
+    private final Catalog icebergCatalog;
+    private final Table icebergTable;
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final ThreadLocal<Long> currentCommitSnapshotId = new 
ThreadLocal<>();
+
+    public IcebergLakeCommitter(IcebergCatalogProvider icebergCatalogProvider, 
TablePath tablePath)
+            throws IOException {
+        this.icebergCatalog = icebergCatalogProvider.get();
+        this.icebergTable = getTable(tablePath);
+        // register iceberg listener
+        Listeners.register(new IcebergSnapshotCreateListener(), 
CreateSnapshotEvent.class);
+    }
+
+    @Override
+    public IcebergCommittable toCommittable(List<IcebergWriteResult> 
icebergWriteResults) {
+        // Aggregate all write results into a single committable
+        IcebergCommittable.Builder builder = IcebergCommittable.builder();
+
+        for (IcebergWriteResult result : icebergWriteResults) {
+            WriteResult writeResult = result.getWriteResult();
+
+            // Add data files
+            for (DataFile dataFile : writeResult.dataFiles()) {
+                builder.addDataFile(dataFile);
+            }
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public long commit(IcebergCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
+        try {
+            // Refresh table to get latest metadata
+            icebergTable.refresh();
+            // Simple append-only case: only data files, no delete files or 
compaction
+            AppendFiles appendFiles = icebergTable.newAppend();
+            for (DataFile dataFile : committable.getDataFiles()) {
+                appendFiles.appendFile(dataFile);
+            }
+            if (!committable.getDeleteFiles().isEmpty()) {
+                throw new IllegalStateException(
+                        "Delete files are not supported in append-only mode. "
+                                + "Found "
+                                + committable.getDeleteFiles().size()
+                                + " delete files.");
+            }
+
+            addFlussProperties(appendFiles, snapshotProperties);
+
+            appendFiles.commit();
+
+            Long commitSnapshotId = currentCommitSnapshotId.get();
+            currentCommitSnapshotId.remove();
+
+            return checkNotNull(
+                    commitSnapshotId, "Iceberg committed snapshot id must be 
non-null.");
+        } catch (Exception e) {
+            throw new IOException("Failed to commit to Iceberg table.", e);
+        }
+    }
+
+    private void addFlussProperties(
+            AppendFiles appendFiles, Map<String, String> snapshotProperties) {
+        appendFiles.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+        for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
+            appendFiles.set(entry.getKey(), entry.getValue());
+        }
+    }
+
+    @Override
+    public void abort(IcebergCommittable committable) {
+        List<String> filesToDelete =
+                committable.getDataFiles().stream()
+                        .map(dataFile -> dataFile.path().toString())
+                        .collect(Collectors.toList());
+        CatalogUtil.deleteFiles(icebergTable.io(), filesToDelete, "data file", 
true);
+    }
+
+    @Nullable
+    @Override
+    public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
+            throws IOException {
+        // todo: may refactor to common methods?
+        Snapshot latestLakeSnapshot =
+                
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+
+        if (latestLakeSnapshot == null) {
+            return null;
+        }
+
+        // Check if there's a gap between Fluss and Iceberg snapshots
+        if (latestLakeSnapshotIdOfFluss != null
+                && latestLakeSnapshot.snapshotId() <= 
latestLakeSnapshotIdOfFluss) {
+            return null;
+        }
+
+        CommittedLakeSnapshot committedLakeSnapshot =
+                new CommittedLakeSnapshot(latestLakeSnapshot.snapshotId());
+
+        // Reconstruct bucket offsets from snapshot properties
+        Map<String, String> properties = latestLakeSnapshot.summary();
+        if (properties == null) {
+            throw new IOException(
+                    "Failed to load committed lake snapshot properties from 
Iceberg.");
+        }
+
+        String flussOffsetProperties = 
properties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+        if (flussOffsetProperties == null) {
+            throw new IllegalArgumentException(
+                    "Cannot resume tiering from snapshot without bucket offset 
properties. "
+                            + "The snapshot was committed to Iceberg but 
missing Fluss metadata.");
+        }
+
+        for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+            BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+            if (bucketOffset.getPartitionId() != null) {
+                committedLakeSnapshot.addPartitionBucket(
+                        bucketOffset.getPartitionId(),
+                        bucketOffset.getPartitionQualifiedName(),
+                        bucketOffset.getBucket(),
+                        bucketOffset.getLogOffset());
+            } else {
+                committedLakeSnapshot.addBucket(
+                        bucketOffset.getBucket(), bucketOffset.getLogOffset());
+            }
+        }
+
+        return committedLakeSnapshot;
+    }
+
+    @Override
+    public void close() throws Exception {
+        try {
+            if (icebergCatalog != null && icebergCatalog instanceof 
AutoCloseable) {
+                ((AutoCloseable) icebergCatalog).close();
+            }
+        } catch (Exception e) {
+            throw new IOException("Failed to close IcebergLakeCommitter.", e);
+        }
+    }
+
+    private Table getTable(TablePath tablePath) throws IOException {
+        try {
+            TableIdentifier tableId = toIceberg(tablePath);
+            return icebergCatalog.loadTable(tableId);
+        } catch (Exception e) {
+            throw new IOException("Failed to get table " + tablePath + " in 
Iceberg.", e);
+        }
+    }
+
+    @Nullable
+    private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) {
+        icebergTable.refresh();
+
+        // Find the latest snapshot committed by Fluss
+        Iterable<Snapshot> snapshots = icebergTable.snapshots();
+        Snapshot latestFlussSnapshot = null;
+
+        for (Snapshot snapshot : snapshots) {
+            Map<String, String> summary = snapshot.summary();
+            if (summary != null && 
commitUser.equals(summary.get("commit-user"))) {
+                if (latestFlussSnapshot == null
+                        || snapshot.snapshotId() > 
latestFlussSnapshot.snapshotId()) {
+                    latestFlussSnapshot = snapshot;
+                }
+            }
+        }
+
+        return latestFlussSnapshot;
+    }
+
+    /** A {@link Listener} to listen the iceberg create snapshot event. */
+    public static class IcebergSnapshotCreateListener implements 
Listener<CreateSnapshotEvent> {
+        @Override
+        public void notify(CreateSnapshotEvent createSnapshotEvent) {
+            currentCommitSnapshotId.set(createSnapshotEvent.snapshotId());
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java
similarity index 56%
copy from 
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
copy to 
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java
index 457009527..e3a101c4a 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.lake.paimon.tiering;
+package com.alibaba.fluss.lake.iceberg.tiering;
 
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.lake.committer.CommitterInitContext;
@@ -27,39 +27,37 @@ import com.alibaba.fluss.lake.writer.WriterInitContext;
 
 import java.io.IOException;
 
-/** Implementation of {@link LakeTieringFactory} for Paimon . */
-public class PaimonLakeTieringFactory
-        implements LakeTieringFactory<PaimonWriteResult, PaimonCommittable> {
-
-    public static final String FLUSS_LAKE_TIERING_COMMIT_USER = 
"__fluss_lake_tiering";
+/** Implementation of {@link LakeTieringFactory} for Iceberg. */
+public class IcebergLakeTieringFactory
+        implements LakeTieringFactory<IcebergWriteResult, IcebergCommittable> {
 
     private static final long serialVersionUID = 1L;
 
-    private final PaimonCatalogProvider paimonCatalogProvider;
+    private final IcebergCatalogProvider icebergCatalogProvider;
 
-    public PaimonLakeTieringFactory(Configuration paimonConfig) {
-        this.paimonCatalogProvider = new PaimonCatalogProvider(paimonConfig);
+    public IcebergLakeTieringFactory(Configuration icebergConfig) {
+        this.icebergCatalogProvider = new 
IcebergCatalogProvider(icebergConfig);
     }
 
     @Override
-    public LakeWriter<PaimonWriteResult> createLakeWriter(WriterInitContext 
writerInitContext)
+    public LakeWriter<IcebergWriteResult> createLakeWriter(WriterInitContext 
writerInitContext)
             throws IOException {
-        return new PaimonLakeWriter(paimonCatalogProvider, writerInitContext);
+        return new IcebergLakeWriter(icebergCatalogProvider, 
writerInitContext);
     }
 
     @Override
-    public SimpleVersionedSerializer<PaimonWriteResult> 
getWriteResultSerializer() {
-        return new PaimonWriteResultSerializer();
+    public SimpleVersionedSerializer<IcebergWriteResult> 
getWriteResultSerializer() {
+        return new IcebergWriteResultSerializer();
     }
 
     @Override
-    public LakeCommitter<PaimonWriteResult, PaimonCommittable> 
createLakeCommitter(
+    public LakeCommitter<IcebergWriteResult, IcebergCommittable> 
createLakeCommitter(
             CommitterInitContext committerInitContext) throws IOException {
-        return new PaimonLakeCommitter(paimonCatalogProvider, 
committerInitContext.tablePath());
+        return new IcebergLakeCommitter(icebergCatalogProvider, 
committerInitContext.tablePath());
     }
 
     @Override
-    public SimpleVersionedSerializer<PaimonCommittable> 
getCommittableSerializer() {
-        return new PaimonCommittableSerializer();
+    public SimpleVersionedSerializer<IcebergCommittable> 
getCommittableSerializer() {
+        return new IcebergCommittableSerializer();
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
new file mode 100644
index 000000000..e75378941
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.lake.iceberg.tiering.append.AppendOnlyWriter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.io.WriteResult;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+
+import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+
+/** Implementation of {@link LakeWriter} for Iceberg. */
+public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
+
+    private final Catalog icebergCatalog;
+    private final Table icebergTable;
+    private final RecordWriter recordWriter;
+
+    public IcebergLakeWriter(
+            IcebergCatalogProvider icebergCatalogProvider, WriterInitContext 
writerInitContext)
+            throws IOException {
+        this.icebergCatalog = icebergCatalogProvider.get();
+        this.icebergTable = getTable(writerInitContext.tablePath());
+
+        // Create record writer based on table type
+        // For now, only supporting non-partitioned append-only tables
+        this.recordWriter = createRecordWriter(writerInitContext);
+    }
+
+    private RecordWriter createRecordWriter(WriterInitContext 
writerInitContext) {
+        if (!icebergTable.spec().isUnpartitioned()) {
+            throw new UnsupportedOperationException("Partitioned tables are 
not yet supported");
+        }
+
+        // For now, assume append-only (no primary keys)
+
+        return new AppendOnlyWriter(
+                icebergTable,
+                writerInitContext.schema().getRowType(),
+                writerInitContext.tableBucket(),
+                null, // No partition for non-partitioned table
+                Collections.emptyList() // No partition keys
+                );
+    }
+
+    @Override
+    public void write(LogRecord record) throws IOException {
+        try {
+            recordWriter.write(record);
+        } catch (Exception e) {
+            throw new IOException("Failed to write Fluss record to Iceberg.", 
e);
+        }
+    }
+
+    @Override
+    public IcebergWriteResult complete() throws IOException {
+        try {
+            WriteResult writeResult = recordWriter.complete();
+            return new IcebergWriteResult(writeResult);
+        } catch (Exception e) {
+            throw new IOException("Failed to complete Iceberg write.", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (recordWriter != null) {
+                recordWriter.close();
+            }
+            if (icebergCatalog != null && icebergCatalog instanceof Closeable) 
{
+                ((Closeable) icebergCatalog).close();
+            }
+        } catch (Exception e) {
+            throw new IOException("Failed to close IcebergLakeWriter.", e);
+        }
+    }
+
+    private Table getTable(TablePath tablePath) throws IOException {
+        try {
+            return icebergCatalog.loadTable(toIceberg(tablePath));
+        } catch (Exception e) {
+            throw new IOException("Failed to get table " + tablePath + " in 
Iceberg.", e);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResult.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResult.java
new file mode 100644
index 000000000..bc0c0605f
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResult.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import org.apache.iceberg.io.WriteResult;
+
+import java.io.Serializable;
+
+/** The write result of Iceberg lake writer to pass to commiter to commit. */
+public class IcebergWriteResult implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final WriteResult writeResult;
+
+    public IcebergWriteResult(WriteResult writeResult) {
+        this.writeResult = writeResult;
+    }
+
+    public WriteResult getWriteResult() {
+        return writeResult;
+    }
+
+    @Override
+    public String toString() {
+        return "IcebergWriteResult{"
+                + "dataFiles="
+                + writeResult.dataFiles().length
+                + ", deleteFiles="
+                + writeResult.deleteFiles().length
+                + '}';
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
new file mode 100644
index 000000000..86c6e6849
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.utils.InstantiationUtils;
+
+import org.apache.iceberg.io.WriteResult;
+
+import java.io.IOException;
+
+/** Serializer for {@link IcebergWriteResult}. */
+public class IcebergWriteResultSerializer implements 
SimpleVersionedSerializer<IcebergWriteResult> {
+
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(IcebergWriteResult icebergWriteResult) throws 
IOException {
+        return 
InstantiationUtils.serializeObject(icebergWriteResult.getWriteResult());
+    }
+
+    @Override
+    public IcebergWriteResult deserialize(int version, byte[] serialized) 
throws IOException {
+        WriteResult writeResult;
+        try {
+            writeResult =
+                    InstantiationUtils.deserializeObject(serialized, 
getClass().getClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+        return new IcebergWriteResult(writeResult);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
new file mode 100644
index 000000000..d6cde3394
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** A base interface to write {@link LogRecord} to Iceberg. */
+public abstract class RecordWriter implements AutoCloseable {
+
+    protected final TaskWriter<Record> taskWriter;
+    protected final Schema icebergSchema;
+    protected final int bucket;
+    @Nullable protected final String partition;
+    protected final FlussRecordAsIcebergRecord flussRecordAsIcebergRecord;
+
+    public RecordWriter(
+            TaskWriter<Record> taskWriter,
+            Schema icebergSchema,
+            RowType flussRowType,
+            TableBucket tableBucket,
+            @Nullable String partition,
+            List<String> partitionKeys) {
+        this.taskWriter = taskWriter;
+        this.icebergSchema = icebergSchema;
+        this.bucket = tableBucket.getBucket();
+        this.partition = partition; // null for non-partitioned tables
+        this.flussRecordAsIcebergRecord =
+                new FlussRecordAsIcebergRecord(
+                        tableBucket.getBucket(), icebergSchema, flussRowType);
+    }
+
+    public abstract void write(LogRecord record) throws Exception;
+
+    public WriteResult complete() throws Exception {
+        // Complete the task writer and get write result
+        return taskWriter.complete();
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (taskWriter != null) {
+            taskWriter.close();
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
new file mode 100644
index 000000000..9c5607477
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering.append;
+
+import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.util.PropertyUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static 
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/** A {@link RecordWriter} to write to Iceberg's append-only table. */
+public class AppendOnlyWriter extends RecordWriter {
+
+    public AppendOnlyWriter(
+            Table icebergTable,
+            RowType flussRowType,
+            TableBucket tableBucket,
+            @Nullable String partition,
+            List<String> partitionKeys) {
+        super(
+                createTaskWriter(icebergTable, tableBucket),
+                icebergTable.schema(),
+                flussRowType,
+                tableBucket,
+                partition,
+                partitionKeys);
+    }
+
+    private static TaskWriter<Record> createTaskWriter(
+            Table icebergTable, TableBucket tableBucket) {
+        // Get target file size from table properties
+        long targetFileSize = targetFileSize(icebergTable);
+
+        FileAppenderFactory<Record> fileAppenderFactory =
+                new GenericAppenderFactory(icebergTable.schema());
+        FileFormat format = fileFormat(icebergTable);
+        OutputFileFactory outputFileFactory =
+                OutputFileFactory.builderFor(
+                                icebergTable,
+                                tableBucket.getBucket(),
+                                // task id always 0
+                                0)
+                        .format(format)
+                        .build();
+
+        return new UnpartitionedWriter<>(
+                icebergTable.spec(),
+                format,
+                fileAppenderFactory,
+                outputFileFactory,
+                icebergTable.io(),
+                targetFileSize);
+    }
+
+    @Override
+    public void write(LogRecord record) throws Exception {
+        flussRecordAsIcebergRecord.setFlussRecord(record);
+        taskWriter.write(flussRecordAsIcebergRecord);
+    }
+
+    private static FileFormat fileFormat(Table icebergTable) {
+        String formatString =
+                PropertyUtil.propertyAsString(
+                        icebergTable.properties(),
+                        TableProperties.DEFAULT_FILE_FORMAT,
+                        TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+        return FileFormat.fromString(formatString);
+    }
+
+    private static long targetFileSize(Table icebergTable) {
+        return PropertyUtil.propertyAsLong(
+                icebergTable.properties(),
+                WRITE_TARGET_FILE_SIZE_BYTES,
+                WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
new file mode 100644
index 000000000..de237c7bf
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.utils;
+
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/** Utility class for static conversions between Fluss and Iceberg types. */
+public class IcebergConversions {
+
+    /** Convert Fluss TablePath to Iceberg TableIdentifier. */
+    public static TableIdentifier toIceberg(TablePath tablePath) {
+        return TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index e2450b80e..393f5a17e 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -1,11 +1,12 @@
 /*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
new file mode 100644
index 000000000..e8455f0df
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergCatalogProvider;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergCommittable;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergWriteResult;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for tiering to Iceberg via {@link IcebergLakeTieringFactory}. */
+class IcebergTieringTest {
+
+    private @TempDir File tempWarehouseDir;
+    private IcebergLakeTieringFactory icebergLakeTieringFactory;
+    private Catalog icebergCatalog;
+
+    @BeforeEach
+    void beforeEach() {
+        Configuration configuration = new Configuration();
+        configuration.setString("warehouse", "file://" + tempWarehouseDir);
+        configuration.setString("type", "hadoop");
+        configuration.setString("name", "test");
+        IcebergCatalogProvider provider = new 
IcebergCatalogProvider(configuration);
+        icebergCatalog = provider.get();
+
+        icebergLakeTieringFactory = new 
IcebergLakeTieringFactory(configuration);
+    }
+
+    @Test
+    void testTieringWriteTable() throws Exception {
+        TablePath tablePath = TablePath.of("iceberg", "test_table");
+        createTable(tablePath);
+
+        Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
+
+        int bucketNum = 3;
+
+        Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
+
+        List<IcebergWriteResult> icebergWriteResults = new ArrayList<>();
+        SimpleVersionedSerializer<IcebergWriteResult> writeResultSerializer =
+                icebergLakeTieringFactory.getWriteResultSerializer();
+        SimpleVersionedSerializer<IcebergCommittable> committableSerializer =
+                icebergLakeTieringFactory.getCommittableSerializer();
+
+        // first, write data
+        for (int bucket = 0; bucket < bucketNum; bucket++) {
+            try (LakeWriter<IcebergWriteResult> writer = 
createLakeWriter(tablePath, bucket)) {
+                List<LogRecord> records = genLogTableRecords(bucket, 5);
+                for (LogRecord record : records) {
+                    writer.write(record);
+                }
+                recordsByBucket.put(bucket, records);
+                IcebergWriteResult result = writer.complete();
+                byte[] serialized = writeResultSerializer.serialize(result);
+                icebergWriteResults.add(
+                        writeResultSerializer.deserialize(
+                                writeResultSerializer.getVersion(), 
serialized));
+            }
+        }
+
+        // second, commit data
+        try (LakeCommitter<IcebergWriteResult, IcebergCommittable> 
lakeCommitter =
+                createLakeCommitter(tablePath)) {
+            // serialize/deserialize committable
+            IcebergCommittable icebergCommittable =
+                    lakeCommitter.toCommittable(icebergWriteResults);
+            byte[] serialized = 
committableSerializer.serialize(icebergCommittable);
+            icebergCommittable =
+                    committableSerializer.deserialize(
+                            committableSerializer.getVersion(), serialized);
+            long snapshot =
+                    lakeCommitter.commit(icebergCommittable, 
Collections.singletonMap("k1", "v1"));
+            icebergTable.refresh();
+            Snapshot icebergSnapshot = icebergTable.currentSnapshot();
+            assertThat(snapshot).isEqualTo(icebergSnapshot.snapshotId());
+            assertThat(icebergSnapshot.summary()).containsEntry("k1", "v1");
+        }
+
+        // then, check data
+        for (int bucket = 0; bucket < 3; bucket++) {
+            List<LogRecord> expectRecords = recordsByBucket.get(bucket);
+            CloseableIterator<Record> actualRecords = 
getIcebergRows(icebergTable, bucket);
+            verifyLogTableRecords(actualRecords, bucket, expectRecords);
+        }
+    }
+
+    private LakeWriter<IcebergWriteResult> createLakeWriter(TablePath 
tablePath, int bucket)
+            throws IOException {
+        return icebergLakeTieringFactory.createLakeWriter(
+                new WriterInitContext() {
+                    @Override
+                    public TablePath tablePath() {
+                        return tablePath;
+                    }
+
+                    @Override
+                    public TableBucket tableBucket() {
+                        return new TableBucket(0, null, bucket);
+                    }
+
+                    @Nullable
+                    @Override
+                    public String partition() {
+                        return null;
+                    }
+
+                    @Override
+                    public Map<String, String> customProperties() {
+                        return Collections.emptyMap();
+                    }
+
+                    @Override
+                    public com.alibaba.fluss.metadata.Schema schema() {
+                        return com.alibaba.fluss.metadata.Schema.newBuilder()
+                                .column("c1", DataTypes.INT())
+                                .column("c2", DataTypes.STRING())
+                                .column("c3", DataTypes.STRING())
+                                .build();
+                    }
+                });
+    }
+
+    private LakeCommitter<IcebergWriteResult, IcebergCommittable> 
createLakeCommitter(
+            TablePath tablePath) throws IOException {
+        return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath);
+    }
+
+    private List<LogRecord> genLogTableRecords(int bucket, int numRecords) {
+        List<LogRecord> logRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            GenericRow genericRow = new GenericRow(3);
+            genericRow.setField(0, i);
+            genericRow.setField(1, BinaryString.fromString("bucket" + bucket + 
"_" + i));
+            genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
+
+            LogRecord logRecord =
+                    new GenericRecord(
+                            i, System.currentTimeMillis(), 
ChangeType.APPEND_ONLY, genericRow);
+            logRecords.add(logRecord);
+        }
+        return logRecords;
+    }
+
+    private void createTable(TablePath tablePath) throws Exception {
+        Namespace namespace = Namespace.of(tablePath.getDatabaseName());
+        if (icebergCatalog instanceof SupportsNamespaces) {
+            SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
+            if (!ns.namespaceExists(namespace)) {
+                ns.createNamespace(namespace);
+            }
+        }
+
+        org.apache.iceberg.Schema schema =
+                new org.apache.iceberg.Schema(
+                        Types.NestedField.optional(1, "c1", 
Types.IntegerType.get()),
+                        Types.NestedField.optional(2, "c2", 
Types.StringType.get()),
+                        Types.NestedField.optional(3, "c3", 
Types.StringType.get()),
+                        Types.NestedField.required(4, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                        Types.NestedField.required(5, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
+                        Types.NestedField.required(
+                                6, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone()));
+
+        TableIdentifier tableId =
+                TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
+        icebergCatalog.createTable(tableId, schema);
+    }
+
+    private CloseableIterator<Record> getIcebergRows(Table table, int bucket) {
+        return IcebergGenerics.read(table)
+                .where(Expressions.equal(BUCKET_COLUMN_NAME, bucket))
+                .build()
+                .iterator();
+    }
+
+    private void verifyLogTableRecords(
+            CloseableIterator<Record> actualRecords,
+            int expectBucket,
+            List<LogRecord> expectRecords) {
+        for (LogRecord expectRecord : expectRecords) {
+            Record actualRecord = actualRecords.next();
+            // check business columns:
+            
assertThat(actualRecord.get(0)).isEqualTo(expectRecord.getRow().getInt(0));
+            assertThat(actualRecord.get(1, String.class))
+                    .isEqualTo(expectRecord.getRow().getString(1).toString());
+            assertThat(actualRecord.get(2, String.class))
+                    .isEqualTo(expectRecord.getRow().getString(2).toString());
+            // check system columns: __bucket, __offset, __timestamp
+            assertThat(actualRecord.get(3)).isEqualTo(expectBucket);
+            
assertThat(actualRecord.get(4)).isEqualTo(expectRecord.logOffset());
+            assertThat(
+                            actualRecord
+                                    .get(5, OffsetDateTime.class)
+                                    .atZoneSameInstant(ZoneOffset.UTC)
+                                    .toInstant()
+                                    .toEpochMilli())
+                    .isEqualTo(expectRecord.timestamp());
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/services/com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin
 
b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/services/com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin
index 693ee63cc..7ece859bd 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/services/com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin
+++ 
b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/services/com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin
@@ -1,11 +1,13 @@
 #
-# Copyright (c) 2025 Alibaba Group Holding Ltd.
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
+#      http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 08bcae786..5fbe54c7e 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -55,7 +55,6 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
     private final Catalog paimonCatalog;
     private final FileStoreTable fileStoreTable;
     private FileStoreCommit fileStoreCommit;
-    private final TablePath tablePath;
     private static final ThreadLocal<Long> currentCommitSnapshotId = new 
ThreadLocal<>();
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
@@ -63,7 +62,6 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
             throws IOException {
         this.paimonCatalog = paimonCatalogProvider.get();
         this.fileStoreTable = getTable(tablePath);
-        this.tablePath = tablePath;
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
index 457009527..cc8c45e4d 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
@@ -31,8 +31,6 @@ import java.io.IOException;
 public class PaimonLakeTieringFactory
         implements LakeTieringFactory<PaimonWriteResult, PaimonCommittable> {
 
-    public static final String FLUSS_LAKE_TIERING_COMMIT_USER = 
"__fluss_lake_tiering";
-
     private static final long serialVersionUID = 1L;
 
     private final PaimonCatalogProvider paimonCatalogProvider;
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
index ca1cb2757..a25e335cb 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
@@ -1,11 +1,12 @@
 /*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/website/docs/quickstart/security.md 
b/website/docs/quickstart/security.md
index e3356e927..44955be67 100644
--- a/website/docs/quickstart/security.md
+++ b/website/docs/quickstart/security.md
@@ -4,11 +4,13 @@ sidebar_position: 2
 ---
 
 <!--
- Copyright (c) 2025 Alibaba Group Holding Ltd.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
 
       http://www.apache.org/licenses/LICENSE-2.0
 

Reply via email to