This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d4dfb45e6 [Iceberg/lake] Iceberg supports append-only non-partitioned
table (#1524)
d4dfb45e6 is described below
commit d4dfb45e6af7116b07cf07ee1e34dd06f6e7f680
Author: MehulBatra <[email protected]>
AuthorDate: Thu Aug 14 16:44:43 2025 +0530
[Iceberg/lake] Iceberg supports append-only non-partitioned table (#1524)
---
.../fluss/lake/writer/LakeTieringFactory.java | 2 +
.../fluss/lake/lakestorage/LakeStorageTest.java | 13 +-
.../source/enumerator/TieringSourceEnumerator.java | 2 +-
fluss-lake/fluss-lake-iceberg/pom.xml | 76 ++++++
.../iceberg/FlussDataTypeToIcebergDataType.java | 13 +-
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 7 +-
.../fluss/lake/iceberg/IcebergLakeStorage.java | 3 +-
.../tiering/FlussRecordAsIcebergRecord.java | 209 +++++++++++++++++
.../iceberg/tiering/IcebergCatalogProvider.java | 50 ++++
.../lake/iceberg/tiering/IcebergCommittable.java | 84 +++++++
.../tiering/IcebergCommittableSerializer.java | 52 ++++
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 235 +++++++++++++++++++
.../tiering/IcebergLakeTieringFactory.java} | 32 ++-
.../lake/iceberg/tiering/IcebergLakeWriter.java | 110 +++++++++
.../lake/iceberg/tiering/IcebergWriteResult.java | 48 ++++
.../tiering/IcebergWriteResultSerializer.java | 54 +++++
.../fluss/lake/iceberg/tiering/RecordWriter.java | 71 ++++++
.../iceberg/tiering/append/AppendOnlyWriter.java | 108 +++++++++
.../lake/iceberg/utils/IcebergConversions.java | 31 +++
.../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 13 +-
.../fluss/lake/iceberg/IcebergTieringTest.java | 261 +++++++++++++++++++++
...libaba.fluss.lake.lakestorage.LakeStoragePlugin | 14 +-
.../lake/paimon/tiering/PaimonLakeCommitter.java | 2 -
.../paimon/tiering/PaimonLakeTieringFactory.java | 2 -
.../replica/fetcher/RemoteLeaderEndpointTest.java | 13 +-
website/docs/quickstart/security.md | 12 +-
26 files changed, 1457 insertions(+), 60 deletions(-)
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
index 61417c9aa..bd7b18701 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/writer/LakeTieringFactory.java
@@ -37,6 +37,8 @@ import java.io.Serializable;
@PublicEvolving
public interface LakeTieringFactory<WriteResult, CommittableT> extends
Serializable {
+ String FLUSS_LAKE_TIERING_COMMIT_USER = "__fluss_lake_tiering";
+
/**
* Creates a lake writer to write Fluss's rows to Paimon/Iceberg rows.
*
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
index 9c8ae73d4..304f05e17 100644
---
a/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
+++
b/fluss-common/src/test/java/com/alibaba/fluss/lake/lakestorage/LakeStorageTest.java
@@ -1,11 +1,12 @@
/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 296f1580f..ae10d3e29 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -203,7 +203,7 @@ public class TieringSourceEnumerator
Long tieringEpoch = tieringTableEpochs.remove(failedTableId);
LOG.info(
"Tiering table {} is failed, fail reason is {}.",
- tieringEpoch,
+ failedTableId,
failedEvent.failReason());
if (tieringEpoch == null) {
// shouldn't happen, warn it
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 01918d3aa..52f37d072 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -38,11 +38,87 @@
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-parquet</artifactId>
+ <version>${iceberg.version}</version>
+ </dependency>
<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-common</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-flink-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>2.8.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${fluss.hadoop.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>avro</artifactId>
+ <groupId>org.apache.avro</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>log4j</artifactId>
+ <groupId>log4j</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>slf4j-log4j12</artifactId>
+ <groupId>org.slf4j</groupId>
+ </exclusion>
+ <exclusion>
+ <groupId>ch.qos.reload4j</groupId>
+ <artifactId>reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jdk.tools</artifactId>
+ <groupId>jdk.tools</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>protobuf-java</artifactId>
+ <groupId>com.google.protobuf</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>commons-io</artifactId>
+ <groupId>commons-io</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Flink test dependency -->
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-flink-${flink.major.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
index 2f265acc4..bfd47b05b 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -1,11 +1,12 @@
/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
index 5c0e06199..dbc087c1f 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -85,8 +85,11 @@ public class IcebergLakeCatalog implements LakeCatalog {
String catalogName = icebergProps.getOrDefault("name",
"fluss-iceberg-catalog");
return buildIcebergCatalog(
- catalogName, icebergProps, null // Optional: pass Hadoop
configuration if available
- );
+ catalogName,
+ icebergProps, // todo: current is an empty configuration, need
to init from env or
+ // fluss
+ // configurations
+ new org.apache.hadoop.conf.Configuration());
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
index 9a553a3dd..9e8daf365 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
@@ -18,6 +18,7 @@
package com.alibaba.fluss.lake.iceberg;
import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -34,7 +35,7 @@ public class IcebergLakeStorage implements LakeStorage {
@Override
public LakeTieringFactory<?, ?> createLakeTieringFactory() {
- throw new UnsupportedOperationException("Not implemented");
+ return new IcebergLakeTieringFactory(icebergConfig);
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
new file mode 100644
index 000000000..f72a40a67
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/FlussRecordAsIcebergRecord.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DateType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimeType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.DateTimeUtils;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Map;
+
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/**
+ * Wrap Fluss {@link LogRecord} as Iceberg {@link Record}.
+ *
+ * <p>todo: refactor to implement ParquetWriters, OrcWriters, AvroWriters just
like Flink & Spark
+ * write to iceberg for higher performance
+ */
+public class FlussRecordAsIcebergRecord implements Record {
+
+ // Lake table for iceberg will append three system columns: __bucket,
__offset,__timestamp
+ private static final int LAKE_ICEBERG_SYSTEM_COLUMNS = 3;
+
+ private LogRecord logRecord;
+ private final int bucket;
+ private final Schema icebergSchema;
+ private final RowType flussRowType;
+
+ // the origin row fields in fluss, excluding the system columns in iceberg
+ private int originRowFieldCount;
+ private InternalRow internalRow;
+
+ public FlussRecordAsIcebergRecord(int bucket, Schema icebergSchema,
RowType flussRowType) {
+ this.bucket = bucket;
+ this.icebergSchema = icebergSchema;
+ this.flussRowType = flussRowType;
+ }
+
+ public void setFlussRecord(LogRecord logRecord) {
+ this.logRecord = logRecord;
+ this.internalRow = logRecord.getRow();
+ this.originRowFieldCount = internalRow.getFieldCount();
+ checkState(
+ originRowFieldCount
+ == icebergSchema.asStruct().fields().size() -
LAKE_ICEBERG_SYSTEM_COLUMNS,
+ "The Iceberg table fields count must equals to LogRecord's
fields count.");
+ }
+
+ @Override
+ public Types.StructType struct() {
+ return icebergSchema.asStruct();
+ }
+
+ @Override
+ public Object getField(String name) {
+ return icebergSchema;
+ }
+
+ @Override
+ public void setField(String name, Object value) {
+ throw new UnsupportedOperationException("method setField is not
supported.");
+ }
+
+ @Override
+ public Object get(int pos) {
+ // firstly, for system columns
+ if (pos == originRowFieldCount) {
+ // bucket column
+ return bucket;
+ } else if (pos == originRowFieldCount + 1) {
+ // log offset column
+ return logRecord.logOffset();
+ } else if (pos == originRowFieldCount + 2) {
+ // timestamp column
+ return getTimestampLtz(logRecord.timestamp());
+ }
+
+ // handle normal columns
+ if (internalRow.isNullAt(pos)) {
+ return null;
+ }
+
+ DataType dataType = flussRowType.getTypeAt(pos);
+ if (dataType instanceof BooleanType) {
+ return internalRow.getBoolean(pos);
+ } else if (dataType instanceof TinyIntType) {
+ return (int) internalRow.getByte(pos);
+ } else if (dataType instanceof SmallIntType) {
+ return internalRow.getShort(pos);
+ } else if (dataType instanceof IntType) {
+ return internalRow.getInt(pos);
+ } else if (dataType instanceof BigIntType) {
+ return internalRow.getLong(pos);
+ } else if (dataType instanceof FloatType) {
+ return internalRow.getFloat(pos);
+ } else if (dataType instanceof DoubleType) {
+ return internalRow.getDouble(pos);
+ } else if (dataType instanceof StringType) {
+ return internalRow.getString(pos).toString();
+ } else if (dataType instanceof BinaryType) {
+ // Iceberg's Record interface expects ByteBuffer for binary types.
+ return ByteBuffer.wrap(internalRow.getBytes(pos));
+ } else if (dataType instanceof DecimalType) {
+ // Iceberg expects BigDecimal for decimal types.
+ DecimalType decimalType = (DecimalType) dataType;
+ return internalRow
+ .getDecimal(pos, decimalType.getPrecision(),
decimalType.getScale())
+ .toBigDecimal();
+ } else if (dataType instanceof LocalZonedTimestampType) {
+ // Iceberg expects OffsetDateTime for timestamp with local
timezone.
+ return getTimestampLtz(
+ internalRow
+ .getTimestampLtz(
+ pos, ((LocalZonedTimestampType)
dataType).getPrecision())
+ .toInstant());
+ } else if (dataType instanceof TimestampType) {
+ // Iceberg expects LocalDateType for timestamp without local
timezone.
+ return internalRow
+ .getTimestampNtz(pos, ((TimestampType)
dataType).getPrecision())
+ .toLocalDateTime();
+ } else if (dataType instanceof DateType) {
+ return DateTimeUtils.toLocalDate(internalRow.getInt(pos));
+ } else if (dataType instanceof TimeType) {
+ return DateTimeUtils.toLocalTime(internalRow.getInt(pos));
+ }
+ throw new UnsupportedOperationException(
+ "Unsupported data type conversion for Fluss type: "
+ + dataType.getClass().getName());
+ }
+
+ private OffsetDateTime getTimestampLtz(long timestamp) {
+ return OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp),
ZoneOffset.UTC);
+ }
+
+ private OffsetDateTime getTimestampLtz(Instant instant) {
+ return OffsetDateTime.ofInstant(instant, ZoneOffset.UTC);
+ }
+
+ @Override
+ public Record copy() {
+ throw new UnsupportedOperationException("method copy is not
supported.");
+ }
+
+ @Override
+ public Record copy(Map<String, Object> overwriteValues) {
+ throw new UnsupportedOperationException("method copy is not
supported.");
+ }
+
+ @Override
+ public int size() {
+ return icebergSchema.asStruct().fields().size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ Object value = get(pos);
+ if (value == null || javaClass.isInstance(value)) {
+ return javaClass.cast(value);
+ } else {
+ throw new IllegalStateException(
+ "Not an instance of " + javaClass.getName() + ": " +
value);
+ }
+ }
+
+ @Override
+ public <T> void set(int pos, T value) {
+ throw new UnsupportedOperationException("method set is not
supported.");
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
new file mode 100644
index 000000000..1c484743a
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.config.Configuration;
+
+import org.apache.iceberg.catalog.Catalog;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
+
+/** A provider for Iceberg catalog. */
+public class IcebergCatalogProvider implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private final Configuration icebergConfig;
+
+ public IcebergCatalogProvider(Configuration icebergConfig) {
+ this.icebergConfig = icebergConfig;
+ }
+
+ public Catalog get() {
+ Map<String, String> icebergProps = icebergConfig.toMap();
+ String catalogName = icebergProps.getOrDefault("name",
"fluss-iceberg-catalog");
+
+ return buildIcebergCatalog(
+ catalogName,
+ icebergProps,
+ // todo: current is an empty configuration, need to init from
env or fluss
+ // configurations
+ new org.apache.hadoop.conf.Configuration());
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittable.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittable.java
new file mode 100644
index 000000000..938ebc477
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittable.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+/** The committable that derived from {@link IcebergWriteResult} to commit to
Iceberg. */
+public class IcebergCommittable implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final List<DataFile> dataFiles;
+ private final List<DeleteFile> deleteFiles;
+
+ private IcebergCommittable(List<DataFile> dataFiles, List<DeleteFile>
deleteFiles) {
+ this.dataFiles = dataFiles;
+ this.deleteFiles = deleteFiles;
+ }
+
+ public List<DataFile> getDataFiles() {
+ return dataFiles;
+ }
+
+ public List<DeleteFile> getDeleteFiles() {
+ return deleteFiles;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for {@link IcebergCommittable}, collecting {@link DataFile} and
{@link DeleteFile}
+ * entries.
+ */
+ public static class Builder {
+ private final List<DataFile> dataFiles = new ArrayList<>();
+ private final List<DeleteFile> deleteFiles = new ArrayList<>();
+
+ public Builder addDataFile(DataFile dataFile) {
+ this.dataFiles.add(dataFile);
+ return this;
+ }
+
+ public Builder addDeleteFile(DeleteFile deleteFile) {
+ this.deleteFiles.add(deleteFile);
+ return this;
+ }
+
+ public IcebergCommittable build() {
+ return new IcebergCommittable(new ArrayList<>(dataFiles), new
ArrayList<>(deleteFiles));
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "IcebergCommittable{"
+ + "dataFiles="
+ + dataFiles.size()
+ + ", deleteFiles="
+ + deleteFiles.size()
+ + '}';
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittableSerializer.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittableSerializer.java
new file mode 100644
index 000000000..0ec247a74
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergCommittableSerializer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.utils.InstantiationUtils;
+
+import java.io.IOException;
+
+/** Serializer for {@link IcebergCommittable}. */
+public class IcebergCommittableSerializer implements
SimpleVersionedSerializer<IcebergCommittable> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergCommittable committable) throws IOException
{
+ return InstantiationUtils.serializeObject(committable);
+ }
+
+ @Override
+ public IcebergCommittable deserialize(int version, byte[] serialized)
throws IOException {
+ IcebergCommittable icebergCommittable;
+ try {
+ icebergCommittable =
+ InstantiationUtils.deserializeObject(serialized,
getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return icebergCommittable;
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
new file mode 100644
index 000000000..6353fb06c
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.metadata.TablePath;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
+
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.events.CreateSnapshotEvent;
+import org.apache.iceberg.events.Listener;
+import org.apache.iceberg.events.Listeners;
+import org.apache.iceberg.io.WriteResult;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static
com.alibaba.fluss.lake.writer.LakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** Implementation of {@link LakeCommitter} for Iceberg. */
+public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult,
IcebergCommittable> {
+
+ private final Catalog icebergCatalog;
+ private final Table icebergTable;
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final ThreadLocal<Long> currentCommitSnapshotId = new
ThreadLocal<>();
+
+ public IcebergLakeCommitter(IcebergCatalogProvider icebergCatalogProvider,
TablePath tablePath)
+ throws IOException {
+ this.icebergCatalog = icebergCatalogProvider.get();
+ this.icebergTable = getTable(tablePath);
+ // register iceberg listener
+ Listeners.register(new IcebergSnapshotCreateListener(),
CreateSnapshotEvent.class);
+ }
+
+ @Override
+ public IcebergCommittable toCommittable(List<IcebergWriteResult>
icebergWriteResults) {
+ // Aggregate all write results into a single committable
+ IcebergCommittable.Builder builder = IcebergCommittable.builder();
+
+ for (IcebergWriteResult result : icebergWriteResults) {
+ WriteResult writeResult = result.getWriteResult();
+
+ // Add data files
+ for (DataFile dataFile : writeResult.dataFiles()) {
+ builder.addDataFile(dataFile);
+ }
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public long commit(IcebergCommittable committable, Map<String, String>
snapshotProperties)
+ throws IOException {
+ try {
+ // Refresh table to get latest metadata
+ icebergTable.refresh();
+ // Simple append-only case: only data files, no delete files or
compaction
+ AppendFiles appendFiles = icebergTable.newAppend();
+ for (DataFile dataFile : committable.getDataFiles()) {
+ appendFiles.appendFile(dataFile);
+ }
+ if (!committable.getDeleteFiles().isEmpty()) {
+ throw new IllegalStateException(
+ "Delete files are not supported in append-only mode. "
+ + "Found "
+ + committable.getDeleteFiles().size()
+ + " delete files.");
+ }
+
+ addFlussProperties(appendFiles, snapshotProperties);
+
+ appendFiles.commit();
+
+ Long commitSnapshotId = currentCommitSnapshotId.get();
+ currentCommitSnapshotId.remove();
+
+ return checkNotNull(
+ commitSnapshotId, "Iceberg committed snapshot id must be
non-null.");
+ } catch (Exception e) {
+ throw new IOException("Failed to commit to Iceberg table.", e);
+ }
+ }
+
+ private void addFlussProperties(
+ AppendFiles appendFiles, Map<String, String> snapshotProperties) {
+ appendFiles.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+ for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
+ appendFiles.set(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void abort(IcebergCommittable committable) {
+ List<String> filesToDelete =
+ committable.getDataFiles().stream()
+ .map(dataFile -> dataFile.path().toString())
+ .collect(Collectors.toList());
+ CatalogUtil.deleteFiles(icebergTable.io(), filesToDelete, "data file",
true);
+ }
+
+ @Nullable
+ @Override
+ public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long
latestLakeSnapshotIdOfFluss)
+ throws IOException {
+ // todo: may refactor to common methods?
+ Snapshot latestLakeSnapshot =
+
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+
+ if (latestLakeSnapshot == null) {
+ return null;
+ }
+
+ // Check if there's a gap between Fluss and Iceberg snapshots
+ if (latestLakeSnapshotIdOfFluss != null
+ && latestLakeSnapshot.snapshotId() <=
latestLakeSnapshotIdOfFluss) {
+ return null;
+ }
+
+ CommittedLakeSnapshot committedLakeSnapshot =
+ new CommittedLakeSnapshot(latestLakeSnapshot.snapshotId());
+
+ // Reconstruct bucket offsets from snapshot properties
+ Map<String, String> properties = latestLakeSnapshot.summary();
+ if (properties == null) {
+ throw new IOException(
+ "Failed to load committed lake snapshot properties from
Iceberg.");
+ }
+
+ String flussOffsetProperties =
properties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+ if (flussOffsetProperties == null) {
+ throw new IllegalArgumentException(
+ "Cannot resume tiering from snapshot without bucket offset
properties. "
+ + "The snapshot was committed to Iceberg but
missing Fluss metadata.");
+ }
+
+ for (JsonNode node : OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+ BucketOffset bucketOffset =
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+ if (bucketOffset.getPartitionId() != null) {
+ committedLakeSnapshot.addPartitionBucket(
+ bucketOffset.getPartitionId(),
+ bucketOffset.getPartitionQualifiedName(),
+ bucketOffset.getBucket(),
+ bucketOffset.getLogOffset());
+ } else {
+ committedLakeSnapshot.addBucket(
+ bucketOffset.getBucket(), bucketOffset.getLogOffset());
+ }
+ }
+
+ return committedLakeSnapshot;
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ if (icebergCatalog != null && icebergCatalog instanceof
AutoCloseable) {
+ ((AutoCloseable) icebergCatalog).close();
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to close IcebergLakeCommitter.", e);
+ }
+ }
+
+ private Table getTable(TablePath tablePath) throws IOException {
+ try {
+ TableIdentifier tableId = toIceberg(tablePath);
+ return icebergCatalog.loadTable(tableId);
+ } catch (Exception e) {
+ throw new IOException("Failed to get table " + tablePath + " in
Iceberg.", e);
+ }
+ }
+
+ @Nullable
+ private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) {
+ icebergTable.refresh();
+
+ // Find the latest snapshot committed by Fluss
+ Iterable<Snapshot> snapshots = icebergTable.snapshots();
+ Snapshot latestFlussSnapshot = null;
+
+ for (Snapshot snapshot : snapshots) {
+ Map<String, String> summary = snapshot.summary();
+ if (summary != null &&
commitUser.equals(summary.get("commit-user"))) {
+ if (latestFlussSnapshot == null
+ || snapshot.snapshotId() >
latestFlussSnapshot.snapshotId()) {
+ latestFlussSnapshot = snapshot;
+ }
+ }
+ }
+
+ return latestFlussSnapshot;
+ }
+
+ /** A {@link Listener} to listen the iceberg create snapshot event. */
+ public static class IcebergSnapshotCreateListener implements
Listener<CreateSnapshotEvent> {
+ @Override
+ public void notify(CreateSnapshotEvent createSnapshotEvent) {
+ currentCommitSnapshotId.set(createSnapshotEvent.snapshotId());
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java
similarity index 56%
copy from
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
copy to
fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java
index 457009527..e3a101c4a 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package com.alibaba.fluss.lake.paimon.tiering;
+package com.alibaba.fluss.lake.iceberg.tiering;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.committer.CommitterInitContext;
@@ -27,39 +27,37 @@ import com.alibaba.fluss.lake.writer.WriterInitContext;
import java.io.IOException;
-/** Implementation of {@link LakeTieringFactory} for Paimon . */
-public class PaimonLakeTieringFactory
- implements LakeTieringFactory<PaimonWriteResult, PaimonCommittable> {
-
- public static final String FLUSS_LAKE_TIERING_COMMIT_USER =
"__fluss_lake_tiering";
+/** Implementation of {@link LakeTieringFactory} for Iceberg. */
+public class IcebergLakeTieringFactory
+ implements LakeTieringFactory<IcebergWriteResult, IcebergCommittable> {
private static final long serialVersionUID = 1L;
- private final PaimonCatalogProvider paimonCatalogProvider;
+ private final IcebergCatalogProvider icebergCatalogProvider;
- public PaimonLakeTieringFactory(Configuration paimonConfig) {
- this.paimonCatalogProvider = new PaimonCatalogProvider(paimonConfig);
+ public IcebergLakeTieringFactory(Configuration icebergConfig) {
+ this.icebergCatalogProvider = new
IcebergCatalogProvider(icebergConfig);
}
@Override
- public LakeWriter<PaimonWriteResult> createLakeWriter(WriterInitContext
writerInitContext)
+ public LakeWriter<IcebergWriteResult> createLakeWriter(WriterInitContext
writerInitContext)
throws IOException {
- return new PaimonLakeWriter(paimonCatalogProvider, writerInitContext);
+ return new IcebergLakeWriter(icebergCatalogProvider,
writerInitContext);
}
@Override
- public SimpleVersionedSerializer<PaimonWriteResult>
getWriteResultSerializer() {
- return new PaimonWriteResultSerializer();
+ public SimpleVersionedSerializer<IcebergWriteResult>
getWriteResultSerializer() {
+ return new IcebergWriteResultSerializer();
}
@Override
- public LakeCommitter<PaimonWriteResult, PaimonCommittable>
createLakeCommitter(
+ public LakeCommitter<IcebergWriteResult, IcebergCommittable>
createLakeCommitter(
CommitterInitContext committerInitContext) throws IOException {
- return new PaimonLakeCommitter(paimonCatalogProvider,
committerInitContext.tablePath());
+ return new IcebergLakeCommitter(icebergCatalogProvider,
committerInitContext.tablePath());
}
@Override
- public SimpleVersionedSerializer<PaimonCommittable>
getCommittableSerializer() {
- return new PaimonCommittableSerializer();
+ public SimpleVersionedSerializer<IcebergCommittable>
getCommittableSerializer() {
+ return new IcebergCommittableSerializer();
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
new file mode 100644
index 000000000..e75378941
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergLakeWriter.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.lake.iceberg.tiering.append.AppendOnlyWriter;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.io.WriteResult;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Collections;
+
+import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+
+/** Implementation of {@link LakeWriter} for Iceberg. */
+public class IcebergLakeWriter implements LakeWriter<IcebergWriteResult> {
+
+ private final Catalog icebergCatalog;
+ private final Table icebergTable;
+ private final RecordWriter recordWriter;
+
+ public IcebergLakeWriter(
+ IcebergCatalogProvider icebergCatalogProvider, WriterInitContext
writerInitContext)
+ throws IOException {
+ this.icebergCatalog = icebergCatalogProvider.get();
+ this.icebergTable = getTable(writerInitContext.tablePath());
+
+ // Create record writer based on table type
+ // For now, only supporting non-partitioned append-only tables
+ this.recordWriter = createRecordWriter(writerInitContext);
+ }
+
+ private RecordWriter createRecordWriter(WriterInitContext
writerInitContext) {
+ if (!icebergTable.spec().isUnpartitioned()) {
+ throw new UnsupportedOperationException("Partitioned tables are
not yet supported");
+ }
+
+ // For now, assume append-only (no primary keys)
+
+ return new AppendOnlyWriter(
+ icebergTable,
+ writerInitContext.schema().getRowType(),
+ writerInitContext.tableBucket(),
+ null, // No partition for non-partitioned table
+ Collections.emptyList() // No partition keys
+ );
+ }
+
+ @Override
+ public void write(LogRecord record) throws IOException {
+ try {
+ recordWriter.write(record);
+ } catch (Exception e) {
+ throw new IOException("Failed to write Fluss record to Iceberg.",
e);
+ }
+ }
+
+ @Override
+ public IcebergWriteResult complete() throws IOException {
+ try {
+ WriteResult writeResult = recordWriter.complete();
+ return new IcebergWriteResult(writeResult);
+ } catch (Exception e) {
+ throw new IOException("Failed to complete Iceberg write.", e);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (recordWriter != null) {
+ recordWriter.close();
+ }
+ if (icebergCatalog != null && icebergCatalog instanceof Closeable)
{
+ ((Closeable) icebergCatalog).close();
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to close IcebergLakeWriter.", e);
+ }
+ }
+
+ private Table getTable(TablePath tablePath) throws IOException {
+ try {
+ return icebergCatalog.loadTable(toIceberg(tablePath));
+ } catch (Exception e) {
+ throw new IOException("Failed to get table " + tablePath + " in
Iceberg.", e);
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResult.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResult.java
new file mode 100644
index 000000000..bc0c0605f
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResult.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import org.apache.iceberg.io.WriteResult;
+
+import java.io.Serializable;
+
+/** The write result of Iceberg lake writer to pass to commiter to commit. */
+public class IcebergWriteResult implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private final WriteResult writeResult;
+
+ public IcebergWriteResult(WriteResult writeResult) {
+ this.writeResult = writeResult;
+ }
+
+ public WriteResult getWriteResult() {
+ return writeResult;
+ }
+
+ @Override
+ public String toString() {
+ return "IcebergWriteResult{"
+ + "dataFiles="
+ + writeResult.dataFiles().length
+ + ", deleteFiles="
+ + writeResult.deleteFiles().length
+ + '}';
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
new file mode 100644
index 000000000..86c6e6849
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergWriteResultSerializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.utils.InstantiationUtils;
+
+import org.apache.iceberg.io.WriteResult;
+
+import java.io.IOException;
+
+/** Serializer for {@link IcebergWriteResult}. */
+public class IcebergWriteResultSerializer implements
SimpleVersionedSerializer<IcebergWriteResult> {
+
+ private static final int CURRENT_VERSION = 1;
+
+ @Override
+ public int getVersion() {
+ return CURRENT_VERSION;
+ }
+
+ @Override
+ public byte[] serialize(IcebergWriteResult icebergWriteResult) throws
IOException {
+ return
InstantiationUtils.serializeObject(icebergWriteResult.getWriteResult());
+ }
+
+ @Override
+ public IcebergWriteResult deserialize(int version, byte[] serialized)
throws IOException {
+ WriteResult writeResult;
+ try {
+ writeResult =
+ InstantiationUtils.deserializeObject(serialized,
getClass().getClassLoader());
+ } catch (ClassNotFoundException e) {
+ throw new IOException(e);
+ }
+ return new IcebergWriteResult(writeResult);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
new file mode 100644
index 000000000..d6cde3394
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/RecordWriter.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.WriteResult;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** A base interface to write {@link LogRecord} to Iceberg. */
+public abstract class RecordWriter implements AutoCloseable {
+
+ protected final TaskWriter<Record> taskWriter;
+ protected final Schema icebergSchema;
+ protected final int bucket;
+ @Nullable protected final String partition;
+ protected final FlussRecordAsIcebergRecord flussRecordAsIcebergRecord;
+
+ public RecordWriter(
+ TaskWriter<Record> taskWriter,
+ Schema icebergSchema,
+ RowType flussRowType,
+ TableBucket tableBucket,
+ @Nullable String partition,
+ List<String> partitionKeys) {
+ this.taskWriter = taskWriter;
+ this.icebergSchema = icebergSchema;
+ this.bucket = tableBucket.getBucket();
+ this.partition = partition; // null for non-partitioned tables
+ this.flussRecordAsIcebergRecord =
+ new FlussRecordAsIcebergRecord(
+ tableBucket.getBucket(), icebergSchema, flussRowType);
+ }
+
+ public abstract void write(LogRecord record) throws Exception;
+
+ public WriteResult complete() throws Exception {
+ // Complete the task writer and get write result
+ return taskWriter.complete();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (taskWriter != null) {
+ taskWriter.close();
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
new file mode 100644
index 000000000..9c5607477
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/tiering/append/AppendOnlyWriter.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering.append;
+
+import com.alibaba.fluss.lake.iceberg.tiering.RecordWriter;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.io.UnpartitionedWriter;
+import org.apache.iceberg.util.PropertyUtil;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+/** A {@link RecordWriter} to write to Iceberg's append-only table. */
+public class AppendOnlyWriter extends RecordWriter {
+
+ public AppendOnlyWriter(
+ Table icebergTable,
+ RowType flussRowType,
+ TableBucket tableBucket,
+ @Nullable String partition,
+ List<String> partitionKeys) {
+ super(
+ createTaskWriter(icebergTable, tableBucket),
+ icebergTable.schema(),
+ flussRowType,
+ tableBucket,
+ partition,
+ partitionKeys);
+ }
+
+ private static TaskWriter<Record> createTaskWriter(
+ Table icebergTable, TableBucket tableBucket) {
+ // Get target file size from table properties
+ long targetFileSize = targetFileSize(icebergTable);
+
+ FileAppenderFactory<Record> fileAppenderFactory =
+ new GenericAppenderFactory(icebergTable.schema());
+ FileFormat format = fileFormat(icebergTable);
+ OutputFileFactory outputFileFactory =
+ OutputFileFactory.builderFor(
+ icebergTable,
+ tableBucket.getBucket(),
+ // task id always 0
+ 0)
+ .format(format)
+ .build();
+
+ return new UnpartitionedWriter<>(
+ icebergTable.spec(),
+ format,
+ fileAppenderFactory,
+ outputFileFactory,
+ icebergTable.io(),
+ targetFileSize);
+ }
+
+ @Override
+ public void write(LogRecord record) throws Exception {
+ flussRecordAsIcebergRecord.setFlussRecord(record);
+ taskWriter.write(flussRecordAsIcebergRecord);
+ }
+
+ private static FileFormat fileFormat(Table icebergTable) {
+ String formatString =
+ PropertyUtil.propertyAsString(
+ icebergTable.properties(),
+ TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ return FileFormat.fromString(formatString);
+ }
+
+ private static long targetFileSize(Table icebergTable) {
+ return PropertyUtil.propertyAsLong(
+ icebergTable.properties(),
+ WRITE_TARGET_FILE_SIZE_BYTES,
+ WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
new file mode 100644
index 000000000..de237c7bf
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/utils/IcebergConversions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.utils;
+
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.iceberg.catalog.TableIdentifier;
+
+/** Utility class for static conversions between Fluss and Iceberg types. */
+public class IcebergConversions {
+
+ /** Convert Fluss TablePath to Iceberg TableIdentifier. */
+ public static TableIdentifier toIceberg(TablePath tablePath) {
+ return TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index e2450b80e..393f5a17e 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -1,11 +1,12 @@
/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
new file mode 100644
index 000000000..e8455f0df
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.committer.LakeCommitter;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergCatalogProvider;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergCommittable;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
+import com.alibaba.fluss.lake.iceberg.tiering.IcebergWriteResult;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.writer.LakeWriter;
+import com.alibaba.fluss.lake.writer.WriterInitContext;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for tiering to Iceberg via {@link IcebergLakeTieringFactory}. */
+class IcebergTieringTest {
+
+ private @TempDir File tempWarehouseDir;
+ private IcebergLakeTieringFactory icebergLakeTieringFactory;
+ private Catalog icebergCatalog;
+
+ @BeforeEach
+ void beforeEach() {
+ Configuration configuration = new Configuration();
+ configuration.setString("warehouse", "file://" + tempWarehouseDir);
+ configuration.setString("type", "hadoop");
+ configuration.setString("name", "test");
+ IcebergCatalogProvider provider = new
IcebergCatalogProvider(configuration);
+ icebergCatalog = provider.get();
+
+ icebergLakeTieringFactory = new
IcebergLakeTieringFactory(configuration);
+ }
+
+ @Test
+ void testTieringWriteTable() throws Exception {
+ TablePath tablePath = TablePath.of("iceberg", "test_table");
+ createTable(tablePath);
+
+ Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
+
+ int bucketNum = 3;
+
+ Map<Integer, List<LogRecord>> recordsByBucket = new HashMap<>();
+
+ List<IcebergWriteResult> icebergWriteResults = new ArrayList<>();
+ SimpleVersionedSerializer<IcebergWriteResult> writeResultSerializer =
+ icebergLakeTieringFactory.getWriteResultSerializer();
+ SimpleVersionedSerializer<IcebergCommittable> committableSerializer =
+ icebergLakeTieringFactory.getCommittableSerializer();
+
+ // first, write data
+ for (int bucket = 0; bucket < bucketNum; bucket++) {
+ try (LakeWriter<IcebergWriteResult> writer =
createLakeWriter(tablePath, bucket)) {
+ List<LogRecord> records = genLogTableRecords(bucket, 5);
+ for (LogRecord record : records) {
+ writer.write(record);
+ }
+ recordsByBucket.put(bucket, records);
+ IcebergWriteResult result = writer.complete();
+ byte[] serialized = writeResultSerializer.serialize(result);
+ icebergWriteResults.add(
+ writeResultSerializer.deserialize(
+ writeResultSerializer.getVersion(),
serialized));
+ }
+ }
+
+ // second, commit data
+ try (LakeCommitter<IcebergWriteResult, IcebergCommittable>
lakeCommitter =
+ createLakeCommitter(tablePath)) {
+ // serialize/deserialize committable
+ IcebergCommittable icebergCommittable =
+ lakeCommitter.toCommittable(icebergWriteResults);
+ byte[] serialized =
committableSerializer.serialize(icebergCommittable);
+ icebergCommittable =
+ committableSerializer.deserialize(
+ committableSerializer.getVersion(), serialized);
+ long snapshot =
+ lakeCommitter.commit(icebergCommittable,
Collections.singletonMap("k1", "v1"));
+ icebergTable.refresh();
+ Snapshot icebergSnapshot = icebergTable.currentSnapshot();
+ assertThat(snapshot).isEqualTo(icebergSnapshot.snapshotId());
+ assertThat(icebergSnapshot.summary()).containsEntry("k1", "v1");
+ }
+
+ // then, check data
+ for (int bucket = 0; bucket < 3; bucket++) {
+ List<LogRecord> expectRecords = recordsByBucket.get(bucket);
+ CloseableIterator<Record> actualRecords =
getIcebergRows(icebergTable, bucket);
+ verifyLogTableRecords(actualRecords, bucket, expectRecords);
+ }
+ }
+
+ private LakeWriter<IcebergWriteResult> createLakeWriter(TablePath
tablePath, int bucket)
+ throws IOException {
+ return icebergLakeTieringFactory.createLakeWriter(
+ new WriterInitContext() {
+ @Override
+ public TablePath tablePath() {
+ return tablePath;
+ }
+
+ @Override
+ public TableBucket tableBucket() {
+ return new TableBucket(0, null, bucket);
+ }
+
+ @Nullable
+ @Override
+ public String partition() {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> customProperties() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public com.alibaba.fluss.metadata.Schema schema() {
+ return com.alibaba.fluss.metadata.Schema.newBuilder()
+ .column("c1", DataTypes.INT())
+ .column("c2", DataTypes.STRING())
+ .column("c3", DataTypes.STRING())
+ .build();
+ }
+ });
+ }
+
+ private LakeCommitter<IcebergWriteResult, IcebergCommittable>
createLakeCommitter(
+ TablePath tablePath) throws IOException {
+ return icebergLakeTieringFactory.createLakeCommitter(() -> tablePath);
+ }
+
+ private List<LogRecord> genLogTableRecords(int bucket, int numRecords) {
+ List<LogRecord> logRecords = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ GenericRow genericRow = new GenericRow(3);
+ genericRow.setField(0, i);
+ genericRow.setField(1, BinaryString.fromString("bucket" + bucket +
"_" + i));
+ genericRow.setField(2, BinaryString.fromString("bucket" + bucket));
+
+ LogRecord logRecord =
+ new GenericRecord(
+ i, System.currentTimeMillis(),
ChangeType.APPEND_ONLY, genericRow);
+ logRecords.add(logRecord);
+ }
+ return logRecords;
+ }
+
+ private void createTable(TablePath tablePath) throws Exception {
+ Namespace namespace = Namespace.of(tablePath.getDatabaseName());
+ if (icebergCatalog instanceof SupportsNamespaces) {
+ SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog;
+ if (!ns.namespaceExists(namespace)) {
+ ns.createNamespace(namespace);
+ }
+ }
+
+ org.apache.iceberg.Schema schema =
+ new org.apache.iceberg.Schema(
+ Types.NestedField.optional(1, "c1",
Types.IntegerType.get()),
+ Types.NestedField.optional(2, "c2",
Types.StringType.get()),
+ Types.NestedField.optional(3, "c3",
Types.StringType.get()),
+ Types.NestedField.required(4, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ Types.NestedField.required(5, OFFSET_COLUMN_NAME,
Types.LongType.get()),
+ Types.NestedField.required(
+ 6, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone()));
+
+ TableIdentifier tableId =
+ TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
+ icebergCatalog.createTable(tableId, schema);
+ }
+
+ private CloseableIterator<Record> getIcebergRows(Table table, int bucket) {
+ return IcebergGenerics.read(table)
+ .where(Expressions.equal(BUCKET_COLUMN_NAME, bucket))
+ .build()
+ .iterator();
+ }
+
+ private void verifyLogTableRecords(
+ CloseableIterator<Record> actualRecords,
+ int expectBucket,
+ List<LogRecord> expectRecords) {
+ for (LogRecord expectRecord : expectRecords) {
+ Record actualRecord = actualRecords.next();
+ // check business columns:
+
assertThat(actualRecord.get(0)).isEqualTo(expectRecord.getRow().getInt(0));
+ assertThat(actualRecord.get(1, String.class))
+ .isEqualTo(expectRecord.getRow().getString(1).toString());
+ assertThat(actualRecord.get(2, String.class))
+ .isEqualTo(expectRecord.getRow().getString(2).toString());
+ // check system columns: __bucket, __offset, __timestamp
+ assertThat(actualRecord.get(3)).isEqualTo(expectBucket);
+
assertThat(actualRecord.get(4)).isEqualTo(expectRecord.logOffset());
+ assertThat(
+ actualRecord
+ .get(5, OffsetDateTime.class)
+ .atZoneSameInstant(ZoneOffset.UTC)
+ .toInstant()
+ .toEpochMilli())
+ .isEqualTo(expectRecord.timestamp());
+ }
+ }
+}
diff --git
a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/services/com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin
b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/services/com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin
index 693ee63cc..7ece859bd 100644
---
a/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/services/com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin
+++
b/fluss-lake/fluss-lake-lance/src/main/resources/META-INF/services/com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin
@@ -1,11 +1,13 @@
#
-# Copyright (c) 2025 Alibaba Group Holding Ltd.
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 08bcae786..5fbe54c7e 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -55,7 +55,6 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
private final Catalog paimonCatalog;
private final FileStoreTable fileStoreTable;
private FileStoreCommit fileStoreCommit;
- private final TablePath tablePath;
private static final ThreadLocal<Long> currentCommitSnapshotId = new
ThreadLocal<>();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -63,7 +62,6 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
throws IOException {
this.paimonCatalog = paimonCatalogProvider.get();
this.fileStoreTable = getTable(tablePath);
- this.tablePath = tablePath;
}
@Override
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
index 457009527..cc8c45e4d 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeTieringFactory.java
@@ -31,8 +31,6 @@ import java.io.IOException;
public class PaimonLakeTieringFactory
implements LakeTieringFactory<PaimonWriteResult, PaimonCommittable> {
- public static final String FLUSS_LAKE_TIERING_COMMIT_USER =
"__fluss_lake_tiering";
-
private static final long serialVersionUID = 1L;
private final PaimonCatalogProvider paimonCatalogProvider;
diff --git
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
index ca1cb2757..a25e335cb 100644
---
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
+++
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/fetcher/RemoteLeaderEndpointTest.java
@@ -1,11 +1,12 @@
/*
- * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
diff --git a/website/docs/quickstart/security.md
b/website/docs/quickstart/security.md
index e3356e927..44955be67 100644
--- a/website/docs/quickstart/security.md
+++ b/website/docs/quickstart/security.md
@@ -4,11 +4,13 @@ sidebar_position: 2
---
<!--
- Copyright (c) 2025 Alibaba Group Holding Ltd.
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0