This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1f4ad5b1e [lake/iceberg] Support iceberg log table union read (#1684)
1f4ad5b1e is described below

commit 1f4ad5b1ef05ad62da0e6dec7e409d81536a743a
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Sep 15 19:05:10 2025 +0800

    [lake/iceberg] Support iceberg log table union read (#1684)
    
    ---------
    
    Co-authored-by: luoyuxia <[email protected]>
---
 .../apache/fluss/flink/utils/DataLakeUtils.java    |   7 -
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |  14 +-
 .../fluss/lake/iceberg/IcebergLakeStorage.java     |   6 +-
 .../lake/iceberg/source/IcebergLakeSource.java     |  85 ++++++++
 .../iceberg/source/IcebergRecordAsFlussRow.java    |   3 +-
 .../lake/iceberg/source/IcebergRecordReader.java   | 136 ++++++++++++
 .../fluss/lake/iceberg/source/IcebergSplit.java    |  67 ++++++
 .../lake/iceberg/source/IcebergSplitPlanner.java   | 125 +++++++++++
 .../iceberg/source/IcebergSplitSerializer.java     |  51 +++++
 .../iceberg/tiering/IcebergCatalogProvider.java    |  12 +-
 .../IcebergCatalogUtils.java}                      |  22 +-
 .../flink/FlinkUnionReadLogTableITCase.java        | 196 ++++++-----------
 .../lake/iceberg/flink/FlinkUnionReadTestBase.java |  58 +++++
 .../source/IcebergRecordAsFlussRowTest.java        |   6 +-
 .../iceberg/source/IcebergRecordReaderTest.java    | 239 +++++++++++++++++++++
 .../lake/iceberg/source/IcebergSourceTestBase.java | 159 ++++++++++++++
 .../iceberg/source/IcebergSplitPlannerTest.java    | 147 +++++++++++++
 .../iceberg/source/IcebergSplitSerializerTest.java |  88 ++++++++
 .../testutils/FlinkIcebergTieringTestBase.java     |  29 +++
 .../lake/iceberg/utils/IcebergConversionsTest.java |   9 +-
 .../fluss/lake/paimon/source/PaimonLakeSource.java |   1 +
 .../paimon/flink/FlinkUnionReadLogTableITCase.java |   4 +-
 fluss-test-coverage/pom.xml                        |   1 +
 23 files changed, 1269 insertions(+), 196 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/DataLakeUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/DataLakeUtils.java
index b4d82109a..7cc09a139 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/DataLakeUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/DataLakeUtils.java
@@ -42,13 +42,6 @@ public class DataLakeUtils {
                             ConfigOptions.TABLE_DATALAKE_FORMAT.key()));
         }
 
-        if (datalakeFormat != DataLakeFormat.PAIMON) {
-            throw new UnsupportedOperationException(
-                    String.format(
-                            "The datalake format %s " + " is not supported. 
Only %s is supported.",
-                            datalakeFormat, DataLakeFormat.PAIMON));
-        }
-
         // currently, extract datalake catalog config
         String dataLakePrefix = "table.datalake." + datalakeFormat + ".";
         return extractAndRemovePrefix(tableOptions.toMap(), dataLakePrefix);
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 1f3a2b8cb..842fcddde 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -20,7 +20,7 @@ package org.apache.fluss.lake.iceberg;
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.TableAlreadyExistException;
-import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
@@ -49,13 +49,10 @@ import java.util.Set;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
-import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
 
 /** An Iceberg implementation of {@link LakeCatalog}. */
 public class IcebergLakeCatalog implements LakeCatalog {
 
-    public static final String ICEBERG_CATALOG_DEFAULT_NAME = 
"fluss-iceberg-catalog";
-
     public static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
 
     static {
@@ -74,7 +71,7 @@ public class IcebergLakeCatalog implements LakeCatalog {
     private static final String ICEBERG_CONF_PREFIX = "iceberg.";
 
     public IcebergLakeCatalog(Configuration configuration) {
-        this.icebergCatalog = createIcebergCatalog(configuration);
+        this.icebergCatalog = 
IcebergCatalogUtils.createIcebergCatalog(configuration);
     }
 
     @VisibleForTesting
@@ -82,13 +79,6 @@ public class IcebergLakeCatalog implements LakeCatalog {
         return icebergCatalog;
     }
 
-    private Catalog createIcebergCatalog(Configuration configuration) {
-        Map<String, String> icebergProps = configuration.toMap();
-        String catalogName = icebergProps.getOrDefault("name", 
ICEBERG_CATALOG_DEFAULT_NAME);
-        return buildIcebergCatalog(
-                catalogName, icebergProps, 
IcebergConfiguration.from(configuration).get());
-    }
-
     @Override
     public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
             throws TableAlreadyExistException {
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeStorage.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeStorage.java
index fc3bf97c7..fd631d252 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeStorage.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeStorage.java
@@ -18,6 +18,8 @@
 package org.apache.fluss.lake.iceberg;
 
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.source.IcebergLakeSource;
+import org.apache.fluss.lake.iceberg.source.IcebergSplit;
 import org.apache.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
 import org.apache.fluss.lake.lakestorage.LakeStorage;
 import org.apache.fluss.lake.source.LakeSource;
@@ -44,7 +46,7 @@ public class IcebergLakeStorage implements LakeStorage {
     }
 
     @Override
-    public LakeSource<?> createLakeSource(TablePath tablePath) {
-        throw new UnsupportedOperationException("Not implemented");
+    public LakeSource<IcebergSplit> createLakeSource(TablePath tablePath) {
+        return new IcebergLakeSource(icebergConfig, tablePath);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
new file mode 100644
index 000000000..31eee6fb5
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergLakeSource.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.Planner;
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.predicate.Predicate;
+
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+
+/** Iceberg lake source. */
+public class IcebergLakeSource implements LakeSource<IcebergSplit> {
+    private static final long serialVersionUID = 1L;
+    private final Configuration icebergConfig;
+    private final TablePath tablePath;
+    private @Nullable int[][] project;
+
+    public IcebergLakeSource(Configuration icebergConfig, TablePath tablePath) 
{
+        this.icebergConfig = icebergConfig;
+        this.tablePath = tablePath;
+    }
+
+    @Override
+    public void withProject(int[][] project) {
+        this.project = project;
+    }
+
+    @Override
+    public void withLimit(int limit) {
+        throw new UnsupportedOperationException("Not impl.");
+    }
+
+    @Override
+    public FilterPushDownResult withFilters(List<Predicate> predicates) {
+        // TODO: Support filter push down. #1676
+        return FilterPushDownResult.of(Collections.emptyList(), predicates);
+    }
+
+    @Override
+    public Planner<IcebergSplit> createPlanner(PlannerContext context) throws 
IOException {
+        return new IcebergSplitPlanner(icebergConfig, tablePath, 
context.snapshotId());
+    }
+
+    @Override
+    public RecordReader createRecordReader(ReaderContext<IcebergSplit> 
context) throws IOException {
+        Catalog catalog = 
IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
+        Table table = catalog.loadTable(toIceberg(tablePath));
+        return new IcebergRecordReader(context.lakeSplit().fileScanTask(), 
table, project);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<IcebergSplit> getSplitSerializer() {
+        return new IcebergSplitSerializer();
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
index da9c564e1..7c7da6c51 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRow.java
@@ -41,8 +41,9 @@ public class IcebergRecordAsFlussRow implements InternalRow {
 
     public IcebergRecordAsFlussRow() {}
 
-    public void setIcebergRecord(Record icebergRecord) {
+    public IcebergRecordAsFlussRow replaceIcebergRecord(Record icebergRecord) {
         this.icebergRecord = icebergRecord;
+        return this;
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
new file mode 100644
index 000000000..d848ec152
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReader.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.record.ChangeType;
+import org.apache.fluss.record.GenericRecord;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.ProjectedRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Iceberg record reader. */
+public class IcebergRecordReader implements RecordReader {
+    protected IcebergRecordAsFlussRecordIterator iterator;
+    protected @Nullable int[][] project;
+    protected Types.StructType struct;
+
+    public IcebergRecordReader(FileScanTask fileScanTask, Table table, 
@Nullable int[][] project) {
+        TableScan tableScan = table.newScan();
+        if (project != null) {
+            tableScan = applyProject(tableScan, project);
+        }
+        IcebergGenericReader reader = new IcebergGenericReader(tableScan, 
true);
+        struct = tableScan.schema().asStruct();
+        this.iterator = new 
IcebergRecordAsFlussRecordIterator(reader.open(fileScanTask), struct);
+    }
+
+    @Override
+    public CloseableIterator<LogRecord> read() throws IOException {
+        return iterator;
+    }
+
+    private TableScan applyProject(TableScan tableScan, int[][] projects) {
+        Types.StructType structType = tableScan.schema().asStruct();
+        List<Types.NestedField> cols = new ArrayList<>(projects.length + 2);
+
+        for (int[] project : projects) {
+            cols.add(structType.fields().get(project[0]));
+        }
+
+        cols.add(structType.field(OFFSET_COLUMN_NAME));
+        cols.add(structType.field(TIMESTAMP_COLUMN_NAME));
+        return tableScan.project(new Schema(cols));
+    }
+
+    /** Iterator for iceberg record as fluss record. */
+    public static class IcebergRecordAsFlussRecordIterator implements 
CloseableIterator<LogRecord> {
+
+        private final org.apache.iceberg.io.CloseableIterator<Record> 
icebergRecordIterator;
+
+        private final ProjectedRow projectedRow;
+        private final IcebergRecordAsFlussRow icebergRecordAsFlussRow;
+
+        private final int logOffsetColIndex;
+        private final int timestampColIndex;
+
+        public IcebergRecordAsFlussRecordIterator(
+                CloseableIterable<Record> icebergRecordIterator, 
Types.StructType struct) {
+            this.icebergRecordIterator = icebergRecordIterator.iterator();
+            this.logOffsetColIndex = 
struct.fields().indexOf(struct.field(OFFSET_COLUMN_NAME));
+            this.timestampColIndex = 
struct.fields().indexOf(struct.field(TIMESTAMP_COLUMN_NAME));
+
+            int[] project = IntStream.range(0, struct.fields().size() - 
2).toArray();
+            projectedRow = ProjectedRow.from(project);
+            icebergRecordAsFlussRow = new IcebergRecordAsFlussRow();
+        }
+
+        @Override
+        public void close() {
+            try {
+                icebergRecordIterator.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Fail to close iterator.", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return icebergRecordIterator.hasNext();
+        }
+
+        @Override
+        public LogRecord next() {
+            Record icebergRecord = icebergRecordIterator.next();
+            long offset = icebergRecord.get(logOffsetColIndex, Long.class);
+            long timestamp =
+                    icebergRecord
+                            .get(timestampColIndex, OffsetDateTime.class)
+                            .toInstant()
+                            .toEpochMilli();
+
+            return new GenericRecord(
+                    offset,
+                    timestamp,
+                    ChangeType.INSERT,
+                    projectedRow.replaceRow(
+                            
icebergRecordAsFlussRow.replaceIcebergRecord(icebergRecord)));
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplit.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplit.java
new file mode 100644
index 000000000..65cc10a09
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplit.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSplit;
+
+import org.apache.iceberg.FileScanTask;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Split for Iceberg table. */
+public class IcebergSplit implements LakeSplit, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final FileScanTask fileScanTask;
+    private final int bucket;
+    private final List<String> partition;
+
+    public IcebergSplit(FileScanTask fileScanTask, int bucket, List<String> 
partition) {
+        this.fileScanTask = fileScanTask;
+        this.bucket = bucket;
+        this.partition = partition;
+    }
+
+    @Override
+    public int bucket() {
+        return bucket;
+    }
+
+    @Override
+    public List<String> partition() {
+        return partition;
+    }
+
+    public FileScanTask fileScanTask() {
+        return fileScanTask;
+    }
+
+    @Override
+    public String toString() {
+        return "IcebergSplit{"
+                + "task="
+                + fileScanTask
+                + ", bucket="
+                + bucket
+                + ", partition="
+                + partition
+                + '}';
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
new file mode 100644
index 000000000..89a5dc269
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlanner.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
+import org.apache.fluss.lake.source.Planner;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+
+/** Iceberg split planner. */
+public class IcebergSplitPlanner implements Planner<IcebergSplit> {
+
+    private final Configuration icebergConfig;
+    private final TablePath tablePath;
+    private final long snapshotId;
+
+    public IcebergSplitPlanner(Configuration icebergConfig, TablePath 
tablePath, long snapshotId) {
+        this.icebergConfig = icebergConfig;
+        this.tablePath = tablePath;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public List<IcebergSplit> plan() throws IOException {
+        List<IcebergSplit> splits = new ArrayList<>();
+        Catalog catalog = 
IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
+        Table table = catalog.loadTable(toIceberg(tablePath));
+        Function<FileScanTask, List<String>> partitionExtract = 
createPartitionExtractor(table);
+        Function<FileScanTask, Integer> bucketExtractor = 
createBucketExtractor(table);
+        try (CloseableIterable<FileScanTask> tasks =
+                table.newScan()
+                        .useSnapshot(snapshotId)
+                        .includeColumnStats()
+                        .ignoreResiduals()
+                        .planFiles()) {
+            tasks.forEach(
+                    task ->
+                            splits.add(
+                                    new IcebergSplit(
+                                            task,
+                                            bucketExtractor.apply(task),
+                                            partitionExtract.apply(task))));
+        }
+        return splits;
+    }
+
+    private Function<FileScanTask, Integer> createBucketExtractor(Table table) 
{
+        PartitionSpec partitionSpec = table.spec();
+        List<PartitionField> partitionFields = partitionSpec.fields();
+
+        // the last one must be partition by fluss bucket
+        PartitionField bucketField = 
partitionFields.get(partitionFields.size() - 1);
+
+        if (table.schema()
+                .asStruct()
+                .field(bucketField.sourceId())
+                .name()
+                .equals(BUCKET_COLUMN_NAME)) {
+            // partition by __bucket column, should be fluss log table without 
bucket key,
+            // we don't care about the bucket since it's bucket un-aware
+            return task -> -1;
+        } else {
+            int bucketFieldIndex = partitionFields.size() - 1;
+            return task -> task.file().partition().get(bucketFieldIndex, 
Integer.class);
+        }
+    }
+
+    private Function<FileScanTask, List<String>> 
createPartitionExtractor(Table table) {
+        PartitionSpec partitionSpec = table.spec();
+        List<PartitionField> partitionFields = partitionSpec.fields();
+
+        // if only one partition, it must not be partitioned table since we 
will always use
+        // partition by fluss bucket
+        if (partitionSpec.fields().size() <= 1) {
+            return task -> Collections.emptyList();
+        } else {
+            List<Integer> partitionFieldIndices =
+                    // since will always first partition by fluss partition 
columns, then fluss
+                    // bucket,
+                    // just ignore the last partition column of iceberg
+                    IntStream.range(0, partitionFields.size() - 1)
+                            .boxed()
+                            .collect(Collectors.toList());
+            return task ->
+                    partitionFieldIndices.stream()
+                            // since currently, only string partition is 
supported
+                            .map(index -> task.partition().get(index, 
String.class))
+                            .collect(Collectors.toList());
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializer.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializer.java
new file mode 100644
index 000000000..507f49d64
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
+import org.apache.fluss.utils.InstantiationUtils;
+
+import java.io.IOException;
+
+/** Serializer for Iceberg split. */
+public class IcebergSplitSerializer implements 
SimpleVersionedSerializer<IcebergSplit> {
+    private static final int CURRENT_VERSION = 1;
+
+    @Override
+    public int getVersion() {
+        return CURRENT_VERSION;
+    }
+
+    @Override
+    public byte[] serialize(IcebergSplit icebergSplit) throws IOException {
+        return InstantiationUtils.serializeObject(icebergSplit);
+    }
+
+    @Override
+    public IcebergSplit deserialize(int version, byte[] serialized) throws 
IOException {
+        IcebergSplit icebergSplit;
+        try {
+            icebergSplit =
+                    InstantiationUtils.deserializeObject(serialized, 
getClass().getClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+        return icebergSplit;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
index 3c53918ec..db468035f 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
@@ -18,15 +18,11 @@
 package org.apache.fluss.lake.iceberg.tiering;
 
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
 
 import org.apache.iceberg.catalog.Catalog;
 
 import java.io.Serializable;
-import java.util.Map;
-
-import static 
org.apache.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
-import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
 
 /** A provider for Iceberg catalog. */
 public class IcebergCatalogProvider implements Serializable {
@@ -39,10 +35,6 @@ public class IcebergCatalogProvider implements Serializable {
     }
 
     public Catalog get() {
-        Map<String, String> icebergProps = icebergConfig.toMap();
-        String catalogName = icebergProps.getOrDefault("name", 
ICEBERG_CATALOG_DEFAULT_NAME);
-
-        return buildIcebergCatalog(
-                catalogName, icebergProps, 
IcebergConfiguration.from(icebergConfig).get());
+        return IcebergCatalogUtils.createIcebergCatalog(icebergConfig);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergCatalogUtils.java
similarity index 67%
copy from 
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
copy to 
fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergCatalogUtils.java
index 3c53918ec..8d91fef14 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergCatalogProvider.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/utils/IcebergCatalogUtils.java
@@ -15,34 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.lake.iceberg.tiering;
+package org.apache.fluss.lake.iceberg.utils;
 
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
 
 import org.apache.iceberg.catalog.Catalog;
 
-import java.io.Serializable;
 import java.util.Map;
 
-import static 
org.apache.fluss.lake.iceberg.IcebergLakeCatalog.ICEBERG_CATALOG_DEFAULT_NAME;
 import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
 
-/** A provider for Iceberg catalog. */
-public class IcebergCatalogProvider implements Serializable {
+/** Iceberg catalog utils. */
+public class IcebergCatalogUtils {
 
-    private static final long serialVersionUID = 1L;
-    private final Configuration icebergConfig;
+    public static final String ICEBERG_CATALOG_DEFAULT_NAME = 
"fluss-iceberg-catalog";
 
-    public IcebergCatalogProvider(Configuration icebergConfig) {
-        this.icebergConfig = icebergConfig;
-    }
-
-    public Catalog get() {
-        Map<String, String> icebergProps = icebergConfig.toMap();
+    public static Catalog createIcebergCatalog(Configuration configuration) {
+        Map<String, String> icebergProps = configuration.toMap();
         String catalogName = icebergProps.getOrDefault("name", 
ICEBERG_CATALOG_DEFAULT_NAME);
-
         return buildIcebergCatalog(
-                catalogName, icebergProps, 
IcebergConfiguration.from(icebergConfig).get());
+                catalogName, icebergProps, 
IcebergConfiguration.from(configuration).get());
     }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
similarity index 61%
copy from 
fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
copy to 
fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
index 987bc0451..5b07b68f4 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadLogTableITCase.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,49 +16,42 @@
  * limitations under the License.
  */
 
-package org.apache.fluss.lake.paimon.flink;
+package org.apache.fluss.lake.iceberg.flink;
 
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.row.TimestampLtz;
 import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypes;
 
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.SavepointFormatType;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nullable;
 
-import java.io.File;
+import java.time.Duration;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
-import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
-import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** The IT case for Flink union data in lake and fluss for log table. */
-class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
-
-    @TempDir public static File savepointDir;
-
+/** Test union read log table with full type. */
+public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
     @BeforeAll
     protected static void beforeAll() {
         FlinkUnionReadTestBase.beforeAll();
@@ -77,22 +71,22 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         // wait until records has been synced
         waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
 
-        // now, start to read the log table, which will read paimon
-        // may read fluss or not, depends on the log offset of paimon snapshot
+        // now, start to read the log table, which will read iceberg
+        // may read fluss or not, depends on the log offset of iceberg snapshot
         List<Row> actual =
                 CollectionUtil.iteratorToList(
                         batchTEnv.executeSql("select * from " + 
tableName).collect());
 
         assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
 
-        // can database sync job
+        // cancel the tiering job
         jobClient.cancel().get();
 
         // write some log data again
         writtenRows.addAll(writeRows(t1, 3, isPartitioned));
 
         // query the log table again and check the data
-        // it should read both paimon snapshot and fluss log
+        // it should read both iceberg snapshot and fluss log
         actual =
                 CollectionUtil.iteratorToList(
                         batchTEnv.executeSql("select * from " + 
tableName).collect());
@@ -117,9 +111,10 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
             String plan = batchTEnv.explainSql(sqlWithPartitionFilter);
 
             // check if the plan contains partition filter
+            // TODO: push down iceberg partition filter
             assertThat(plan)
                     .contains("TableSourceScan(")
-                    .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
+                    .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" + 
partition + "'");
 
             List<Row> expectedFiltered =
                     writtenRows.stream()
@@ -134,114 +129,6 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         }
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
-        // first of all, start tiering
-        JobClient jobClient = buildTieringJob(execEnv);
-
-        String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" 
: "non_partitioned");
-
-        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
-        List<Row> writtenRows = new LinkedList<>();
-        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
-        // wait until records has been synced
-        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
-
-        // now, start to read the log table, which will read paimon
-        // may read fluss or not, depends on the log offset of paimon snapshot
-        CloseableIterator<Row> actual =
-                streamTEnv.executeSql("select * from " + tableName).collect();
-        assertResultsIgnoreOrder(
-                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
-
-        // can database sync job
-        jobClient.cancel().get();
-
-        // write some log data again
-        writtenRows.addAll(writeRows(t1, 3, isPartitioned));
-
-        // query the log table again and check the data
-        // it should read both paimon snapshot and fluss log
-        actual =
-                streamTEnv
-                        .executeSql(
-                                "select * from "
-                                        + tableName
-                                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */")
-                        .collect();
-        if (isPartitioned) {
-            // we write to a new partition to verify partition discovery
-            writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
-        }
-        assertResultsIgnoreOrder(
-                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
-    }
-
-    @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception 
{
-        // first of all, start tiering
-        JobClient jobClient = buildTieringJob(execEnv);
-
-        String tableName1 =
-                "restore_logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
-        String resultTableName =
-                "result_table" + (isPartitioned ? "partitioned" : 
"non_partitioned");
-
-        TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
-        TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
-        List<Row> writtenRows = new LinkedList<>();
-        long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM, 
isPartitioned, writtenRows);
-        // wait until records has been synced
-        waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, 
isPartitioned);
-
-        StreamTableEnvironment streamTEnv = buildSteamTEnv(null);
-        // now, start to read the log table to write to a fluss result table
-        // may read fluss or not, depends on the log offset of paimon snapshot
-        createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, 
false);
-        TableResult insertResult =
-                streamTEnv.executeSql(
-                        "insert into " + resultTableName + " select * from " + 
tableName1);
-
-        CloseableIterator<Row> actual =
-                streamTEnv.executeSql("select * from " + 
resultTableName).collect();
-        if (isPartitioned) {
-            assertRowResultsIgnoreOrder(actual, writtenRows, false);
-        } else {
-            assertResultsExactOrder(actual, writtenRows, false);
-        }
-
-        // now, stop the job with save point
-        String savepointPath =
-                insertResult
-                        .getJobClient()
-                        .get()
-                        .stopWithSavepoint(
-                                false,
-                                savepointDir.getAbsolutePath(),
-                                SavepointFormatType.CANONICAL)
-                        .get();
-
-        // re buildSteamTEnv
-        streamTEnv = buildSteamTEnv(savepointPath);
-        insertResult =
-                streamTEnv.executeSql(
-                        "insert into " + resultTableName + " select * from " + 
tableName1);
-
-        // write some log data again
-        List<Row> rows = writeRows(table1, 3, isPartitioned);
-        if (isPartitioned) {
-            assertRowResultsIgnoreOrder(actual, rows, true);
-        } else {
-            assertResultsExactOrder(actual, rows, true);
-        }
-
-        // cancel jobs
-        insertResult.getJobClient().get().cancel().get();
-        jobClient.cancel().get();
-    }
-
     private long prepareLogTable(
             TablePath tablePath, int bucketNum, boolean isPartitioned, 
List<Row> flinkRows)
             throws Exception {
@@ -263,6 +150,43 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         return t1Id;
     }
 
+    protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, 
boolean isPartitioned)
+            throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("f_boolean", DataTypes.BOOLEAN())
+                        .column("f_byte", DataTypes.TINYINT())
+                        .column("f_short", DataTypes.SMALLINT())
+                        .column("f_int", DataTypes.INT())
+                        .column("f_long", DataTypes.BIGINT())
+                        .column("f_float", DataTypes.FLOAT())
+                        .column("f_double", DataTypes.DOUBLE())
+                        .column("f_string", DataTypes.STRING())
+                        .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+                        .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+                        .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_LTZ(3))
+                        .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
+                        .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+                        .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+                        .column("f_binary", DataTypes.BINARY(4));
+
+        TableDescriptor.Builder tableBuilder =
+                TableDescriptor.builder()
+                        .distributedBy(bucketNum, "f_int")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+
+        if (isPartitioned) {
+            schemaBuilder.column("p", DataTypes.STRING());
+            tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, 
true);
+            tableBuilder.partitionedBy("p");
+            tableBuilder.property(
+                    ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, 
AutoPartitionTimeUnit.YEAR);
+        }
+        tableBuilder.schema(schemaBuilder.build());
+        return createTable(tablePath, tableBuilder.build());
+    }
+
     private List<Row> writeFullTypeRows(
             TablePath tablePath, int rowCount, @Nullable String partition) 
throws Exception {
         List<InternalRow> rows = new ArrayList<>();
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
new file mode 100644
index 000000000..8438c2499
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadTestBase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.flink;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+
+/** Base class for iceberg union read test. */
+class FlinkUnionReadTestBase extends FlinkIcebergTieringTestBase {
+    protected static final String DEFAULT_DB = "fluss";
+
+    protected static final String CATALOG_NAME = "test_iceberg_lake";
+    protected static final int DEFAULT_BUCKET_NUM = 1;
+    StreamTableEnvironment batchTEnv;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        FlinkIcebergTieringTestBase.beforeAll();
+    }
+
+    @BeforeEach
+    public void beforeEach() {
+        super.beforeEach();
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        // create table environment
+        batchTEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inBatchMode());
+        // crate catalog using sql
+        batchTEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        batchTEnv.executeSql("use catalog " + CATALOG_NAME);
+        batchTEnv.executeSql("use " + DEFAULT_DB);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
index 85cbe9f68..42ac4d6fb 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordAsFlussRowTest.java
@@ -79,7 +79,7 @@ class IcebergRecordAsFlussRowTest {
         record.setField("__offset", 100L);
         record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
 
-        icebergRecordAsFlussRow.setIcebergRecord(record);
+        icebergRecordAsFlussRow.replaceIcebergRecord(record);
 
         // Should return count excluding system columns (3 system columns)
         assertThat(icebergRecordAsFlussRow.getFieldCount()).isEqualTo(13);
@@ -91,7 +91,7 @@ class IcebergRecordAsFlussRowTest {
         record.setField("name", null); // null value
         record.setField("age", 30);
 
-        icebergRecordAsFlussRow.setIcebergRecord(record);
+        icebergRecordAsFlussRow.replaceIcebergRecord(record);
 
         assertThat(icebergRecordAsFlussRow.isNullAt(0)).isFalse(); // id
         assertThat(icebergRecordAsFlussRow.isNullAt(1)).isTrue(); // name
@@ -120,7 +120,7 @@ class IcebergRecordAsFlussRowTest {
         record.setField("__offset", 100L);
         record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
 
-        icebergRecordAsFlussRow.setIcebergRecord(record);
+        icebergRecordAsFlussRow.replaceIcebergRecord(record);
 
         // Test all data type conversions
         assertThat(icebergRecordAsFlussRow.getLong(0)).isEqualTo(12345L); // id
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
new file mode 100644
index 000000000..bcf559348
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergRecordReaderTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.RecordReader;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.types.BigIntType;
+import org.apache.fluss.types.BinaryType;
+import org.apache.fluss.types.BooleanType;
+import org.apache.fluss.types.CharType;
+import org.apache.fluss.types.DecimalType;
+import org.apache.fluss.types.DoubleType;
+import org.apache.fluss.types.FloatType;
+import org.apache.fluss.types.IntType;
+import org.apache.fluss.types.LocalZonedTimestampType;
+import org.apache.fluss.types.RowType;
+import org.apache.fluss.types.SmallIntType;
+import org.apache.fluss.types.StringType;
+import org.apache.fluss.types.TimestampType;
+import org.apache.fluss.types.TinyIntType;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.IcebergGenericReader;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergRecordReader}. */
+class IcebergRecordReaderTest extends IcebergSourceTestBase {
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testReadLogTable(boolean isPartitioned) throws Exception {
+        // prepare iceberg table
+        TablePath tablePath =
+                TablePath.of(
+                        DEFAULT_DB, isPartitioned ? DEFAULT_TABLE + 
"_partitioned" : DEFAULT_TABLE);
+        createFullTypeLogTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM);
+
+        InternalRow.FieldGetter[] fieldGetters =
+                InternalRow.createFieldGetters(
+                        RowType.of(
+                                new BigIntType(),
+                                new StringType(),
+                                new IntType(),
+                                new DoubleType(),
+                                new BooleanType(),
+                                new TinyIntType(),
+                                new SmallIntType(),
+                                new FloatType(),
+                                new DecimalType(10, 2),
+                                new TimestampType(),
+                                new LocalZonedTimestampType(),
+                                new BinaryType(),
+                                new CharType(),
+                                new StringType()));
+
+        // write data
+        Table table = getTable(tablePath);
+        List<InternalRow> writtenRows = new ArrayList<>();
+        writtenRows.addAll(writeFullTypeRows(table, 9, isPartitioned ? "p1" : 
null));
+        writtenRows.addAll(writeFullTypeRows(table, 20, isPartitioned ? "p2" : 
null));
+
+        // refresh table
+        table.refresh();
+        Snapshot snapshot = table.currentSnapshot();
+
+        List<Row> actual = new ArrayList<>();
+
+        LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        for (IcebergSplit icebergSplit : 
lakeSource.createPlanner(snapshot::snapshotId).plan()) {
+            RecordReader recordReader = lakeSource.createRecordReader(() -> 
icebergSplit);
+            CloseableIterator<LogRecord> iterator = recordReader.read();
+            actual.addAll(
+                    convertToFlinkRow(
+                            fieldGetters,
+                            TransformingCloseableIterator.transform(iterator, 
LogRecord::getRow)));
+            iterator.close();
+        }
+        List<Row> expect =
+                convertToFlinkRow(fieldGetters, 
CloseableIterator.wrap(writtenRows.iterator()));
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expect);
+
+        // test project
+        InternalRow.FieldGetter[] projectFieldGetters =
+                InternalRow.createFieldGetters(
+                        RowType.of(new TinyIntType(), new StringType(), new 
DoubleType()));
+        lakeSource.withProject(new int[][] {new int[] {5}, new int[] {1}, new 
int[] {3}});
+
+        List<Row> projectActual = new ArrayList<>();
+        for (IcebergSplit icebergSplit : 
lakeSource.createPlanner(snapshot::snapshotId).plan()) {
+            RecordReader recordReader = lakeSource.createRecordReader(() -> 
icebergSplit);
+            CloseableIterator<LogRecord> iterator = recordReader.read();
+            projectActual.addAll(
+                    convertToFlinkRow(
+                            projectFieldGetters,
+                            TransformingCloseableIterator.transform(iterator, 
LogRecord::getRow)));
+            iterator.close();
+        }
+
+        TableScan tableScan =
+                table.newScan()
+                        .project(
+                                new Schema(
+                                        optional(6, "tiny_int", 
Types.IntegerType.get()),
+                                        optional(2, "name", 
Types.StringType.get()),
+                                        optional(4, "salary", 
Types.DoubleType.get())));
+        IcebergGenericReader reader = new IcebergGenericReader(tableScan, 
true);
+        List<Row> projectExpect = new ArrayList<>();
+        try (CloseableIterable<FileScanTask> fileScanTasks = 
tableScan.planFiles()) {
+            for (FileScanTask task : fileScanTasks) {
+                org.apache.iceberg.io.CloseableIterator<Record> iterator =
+                        reader.open(task).iterator();
+                IcebergRecordAsFlussRow recordAsFlussRow = new 
IcebergRecordAsFlussRow();
+                projectExpect.addAll(
+                        convertToFlinkRow(
+                                projectFieldGetters,
+                                TransformingCloseableIterator.transform(
+                                        CloseableIterator.wrap(iterator),
+                                        
recordAsFlussRow::replaceIcebergRecord)));
+                iterator.close();
+            }
+        }
+        
assertThat(projectActual).containsExactlyInAnyOrderElementsOf(projectExpect);
+    }
+
+    private void createFullTypeLogTable(TablePath tablePath, boolean 
isPartitioned, int bucketNum)
+            throws Exception {
+        // Create a schema with various data types
+        Schema schema =
+                new Schema(
+                        required(1, "id", Types.LongType.get()),
+                        optional(2, "name", Types.StringType.get()),
+                        optional(3, "age", Types.IntegerType.get()),
+                        optional(4, "salary", Types.DoubleType.get()),
+                        optional(5, "is_active", Types.BooleanType.get()),
+                        optional(6, "tiny_int", Types.IntegerType.get()),
+                        optional(7, "small_int", Types.IntegerType.get()),
+                        optional(8, "float_val", Types.FloatType.get()),
+                        optional(9, "decimal_val", Types.DecimalType.of(10, 
2)),
+                        optional(10, "timestamp_ntz", 
Types.TimestampType.withoutZone()),
+                        optional(11, "timestamp_ltz", 
Types.TimestampType.withZone()),
+                        optional(12, "binary_data", Types.BinaryType.get()),
+                        optional(13, "char_data", Types.StringType.get()),
+                        optional(14, "dt", Types.StringType.get()),
+
+                        // System columns
+                        required(15, "__bucket", Types.IntegerType.get()),
+                        required(16, "__offset", Types.LongType.get()),
+                        required(17, "__timestamp", 
Types.TimestampType.withZone()));
+        PartitionSpec partitionSpec =
+                isPartitioned
+                        ? PartitionSpec.builderFor(schema)
+                                .identity("dt")
+                                .bucket("id", bucketNum)
+                                .build()
+                        : PartitionSpec.builderFor(schema).bucket("id", 
bucketNum).build();
+        createTable(tablePath, schema, partitionSpec);
+    }
+
+    private List<InternalRow> writeFullTypeRows(
+            Table table, int rowCount, @Nullable String partition) throws 
Exception {
+        List<Record> records = new ArrayList<>();
+        List<InternalRow> flussRows = new ArrayList<>();
+        for (int i = 0; i < rowCount; i++) {
+            GenericRecord record = GenericRecord.create(table.schema());
+
+            record.setField("id", (long) i);
+            record.setField("name", "name_" + i);
+            record.setField("age", 20 + (i % 30));
+            record.setField("salary", 50000.0 + (i * 1000));
+            record.setField("is_active", i % 2 == 0);
+            record.setField("tiny_int", i % 128);
+            record.setField("small_int", i % 32768);
+            record.setField("float_val", 100.5f + i);
+            record.setField("decimal_val", new BigDecimal(i + 100.25));
+            record.setField("timestamp_ntz", LocalDateTime.now());
+            record.setField("timestamp_ltz", 
OffsetDateTime.now(ZoneOffset.UTC));
+            record.setField("binary_data", ByteBuffer.wrap("Hello 
World!".getBytes()));
+            record.setField("char_data", "char_" + i);
+            record.setField("dt", partition);
+
+            record.setField("__bucket", 0);
+            record.setField("__offset", (long) i);
+            record.setField("__timestamp", OffsetDateTime.now(ZoneOffset.UTC));
+
+            records.add(record);
+
+            IcebergRecordAsFlussRow row = new IcebergRecordAsFlussRow();
+            row.replaceIcebergRecord(record);
+            flussRows.add(row);
+        }
+        writeRecord(table, records, partition, 0);
+        return flussRows;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSourceTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSourceTestBase.java
new file mode 100644
index 000000000..b4d770062
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSourceTestBase.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.iceberg.IcebergLakeStorage;
+import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory;
+import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.TaskWriter;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/** Base class for Iceberg source tests. */
+class IcebergSourceTestBase {
+    protected static final String DEFAULT_DB = "fluss_lakehouse";
+    protected static final String DEFAULT_TABLE = "test_lakehouse_table";
+    protected static final int DEFAULT_BUCKET_NUM = 1;
+
+    private static @TempDir File tempWarehouseDir;
+    protected static IcebergLakeStorage lakeStorage;
+    protected static Catalog icebergCatalog;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        Configuration configuration = new Configuration();
+        configuration.setString("warehouse", "file://" + 
tempWarehouseDir.toString());
+        configuration.setString("type", "hadoop");
+        configuration.setString("name", "fluss_test_catalog");
+        lakeStorage = new IcebergLakeStorage(configuration);
+        icebergCatalog = 
IcebergCatalogUtils.createIcebergCatalog(configuration);
+    }
+
+    public void createTable(TablePath tablePath, Schema schema, PartitionSpec 
partitionSpec)
+            throws Exception {
+        if (!((SupportsNamespaces) icebergCatalog)
+                .namespaceExists(Namespace.of(tablePath.getDatabaseName()))) {
+            ((SupportsNamespaces) icebergCatalog)
+                    
.createNamespace(Namespace.of(tablePath.getDatabaseName()));
+        }
+        icebergCatalog.createTable(
+                TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName()),
+                schema,
+                partitionSpec);
+    }
+
+    public void writeRecord(
+            Table table, List<Record> records, @Nullable String partition, int 
bucket)
+            throws Exception {
+        try (TaskWriter<Record> taskWriter =
+                TaskWriterFactory.createTaskWriter(table, partition, bucket)) {
+            for (Record r : records) {
+                taskWriter.write(r);
+            }
+            DataFile[] dataFiles = taskWriter.dataFiles();
+            checkState(dataFiles.length == 1);
+            table.newAppend().appendFile(dataFiles[0]).commit();
+        }
+    }
+
+    public static List<Row> convertToFlinkRow(
+            org.apache.fluss.row.InternalRow.FieldGetter[] fieldGetters,
+            CloseableIterator<InternalRow> flussRowIterator) {
+        List<Row> rows = new ArrayList<>();
+        while (flussRowIterator.hasNext()) {
+            org.apache.fluss.row.InternalRow row = flussRowIterator.next();
+            Row flinkRow = new Row(fieldGetters.length);
+            for (int i = 0; i < fieldGetters.length; i++) {
+                flinkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+            }
+            rows.add(flinkRow);
+        }
+        return rows;
+    }
+
+    public Table getTable(TablePath tablePath) throws Exception {
+        return icebergCatalog.loadTable(
+                TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName()));
+    }
+
+    public GenericRecord createIcebergRecord(Schema schema, Object... values) {
+        GenericRecord record = GenericRecord.create(schema);
+        for (int i = 0; i < values.length; i++) {
+            record.set(i, values[i]);
+        }
+        return record;
+    }
+
+    /** Adapter for transforming closeable iterator. */
+    public static class TransformingCloseableIterator<T, U> implements 
CloseableIterator<U> {
+        private final CloseableIterator<T> source;
+        private final Function<? super T, ? extends U> transformer;
+
+        public TransformingCloseableIterator(
+                CloseableIterator<T> source, Function<? super T, ? extends U> 
transformer) {
+            this.source = source;
+            this.transformer = transformer;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return source.hasNext();
+        }
+
+        @Override
+        public U next() {
+            return transformer.apply(source.next());
+        }
+
+        @Override
+        public void close() {
+            source.close();
+        }
+
+        public static <T, U> CloseableIterator<U> transform(
+                CloseableIterator<T> source, Function<? super T, ? extends U> 
transformer) {
+            return new TransformingCloseableIterator<>(source, transformer);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java
new file mode 100644
index 000000000..2697339e6
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitPlannerTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test class for {@link IcebergSplitPlanner}. */
+class IcebergSplitPlannerTest extends IcebergSourceTestBase {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testLogTablePlan(boolean isPartitioned) throws Exception {
+        // prepare iceberg log table
+        TablePath tablePath =
+                TablePath.of(
+                        DEFAULT_DB, "log_" + (isPartitioned ? "partitioned_" : 
"") + DEFAULT_TABLE);
+        Schema schema =
+                new Schema(
+                        optional(1, "c1", Types.IntegerType.get()),
+                        optional(2, "c2", Types.StringType.get()),
+                        optional(3, "c3", Types.StringType.get()));
+        PartitionSpec partitionSpec =
+                isPartitioned
+                        ? 
PartitionSpec.builderFor(schema).identity("c2").bucket("c1", 2).build()
+                        : PartitionSpec.builderFor(schema).bucket("c1", 
2).build();
+        createTable(tablePath, schema, partitionSpec);
+
+        // write data
+        Table table = getTable(tablePath);
+        GenericRecord record1 = createIcebergRecord(schema, 12, "a", "A");
+        GenericRecord record2 = createIcebergRecord(schema, 13, "b", "B");
+
+        writeRecord(table, Collections.singletonList(record1), isPartitioned ? 
"a" : null, 0);
+        writeRecord(table, Collections.singletonList(record2), isPartitioned ? 
"b" : null, 1);
+
+        // refresh table
+        table.refresh();
+        Snapshot snapshot = table.currentSnapshot();
+
+        LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<IcebergSplit> icebergSplits = 
lakeSource.createPlanner(snapshot::snapshotId).plan();
+        assertThat(icebergSplits.size()).isEqualTo(2);
+        // Log table with bucket-aware
+        assertThat(icebergSplits.stream().map(IcebergSplit::bucket))
+                .containsExactlyInAnyOrder(0, 1);
+        if (isPartitioned) {
+            assertThat(icebergSplits.stream().map(IcebergSplit::partition))
+                    .containsExactlyInAnyOrder(
+                            Collections.singletonList("a"), 
Collections.singletonList("b"));
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testBucketUnawareLogTablePlan(boolean isPartitioned) throws Exception 
{
+        // prepare iceberg table which corresponding to a Fluss bucket unaware 
table log table
+        TablePath tablePath =
+                TablePath.of(
+                        DEFAULT_DB,
+                        "log_"
+                                + (isPartitioned ? "partitioned_" : "")
+                                + DEFAULT_TABLE
+                                + "_fluss_bucket");
+        Schema schema =
+                new Schema(
+                        optional(1, "c1", Types.IntegerType.get()),
+                        optional(2, "c2", Types.StringType.get()),
+                        optional(3, "c3", Types.StringType.get()),
+                        // System columns
+                        required(14, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                        required(15, OFFSET_COLUMN_NAME, Types.LongType.get()),
+                        required(16, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone()));
+        PartitionSpec partitionSpec =
+                isPartitioned
+                        ? PartitionSpec.builderFor(schema)
+                                .identity("c2")
+                                .identity(BUCKET_COLUMN_NAME)
+                                .build()
+                        : 
PartitionSpec.builderFor(schema).identity(BUCKET_COLUMN_NAME).build();
+        createTable(tablePath, schema, partitionSpec);
+
+        // write data
+        Table table = getTable(tablePath);
+        GenericRecord record1 =
+                createIcebergRecord(
+                        schema, 12, "a", "A", 0, 100L, 
OffsetDateTime.now(ZoneOffset.UTC));
+        GenericRecord record2 =
+                createIcebergRecord(
+                        schema, 13, "b", "B", 1, 200L, 
OffsetDateTime.now(ZoneOffset.UTC));
+
+        writeRecord(table, Collections.singletonList(record1), isPartitioned ? 
"a" : null, 0);
+        writeRecord(table, Collections.singletonList(record2), isPartitioned ? 
"b" : null, 1);
+
+        // refresh table
+        table.refresh();
+        Snapshot snapshot = table.currentSnapshot();
+
+        LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<IcebergSplit> icebergSplits = 
lakeSource.createPlanner(snapshot::snapshotId).plan();
+        assertThat(icebergSplits.size()).isEqualTo(2);
+        // Log table is not bucket-aware
+        assertThat(icebergSplits.get(0).bucket()).isEqualTo(-1);
+        assertThat(icebergSplits.get(1).bucket()).isEqualTo(-1);
+        if (isPartitioned) {
+            assertThat(icebergSplits.stream().map(IcebergSplit::partition))
+                    .containsExactlyInAnyOrder(
+                            Collections.singletonList("a"), 
Collections.singletonList("b"));
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializerTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializerTest.java
new file mode 100644
index 000000000..a8df50db0
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/source/IcebergSplitSerializerTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package org.apache.fluss.lake.iceberg.source;
+
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.metadata.TablePath;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test case for {@link IcebergSplitSerializer}. */
+class IcebergSplitSerializerTest extends IcebergSourceTestBase {
+    private final IcebergSplitSerializer serializer = new 
IcebergSplitSerializer();
+
+    @Test
+    void testSerializeAndDeserialize() throws Exception {
+        // prepare iceberg table
+        TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+        Schema schema =
+                new Schema(
+                        optional(1, "c1", Types.IntegerType.get()),
+                        optional(2, "c2", Types.StringType.get()),
+                        optional(3, "c3", Types.StringType.get()));
+        PartitionSpec partitionSpec =
+                PartitionSpec.builderFor(schema).bucket("c1", 
DEFAULT_BUCKET_NUM).build();
+        createTable(tablePath, schema, partitionSpec);
+
+        // write data
+        Table table = getTable(tablePath);
+
+        GenericRecord record1 = createIcebergRecord(schema, 12, "a", "A");
+        GenericRecord record2 = createIcebergRecord(schema, 13, "b", "B");
+
+        writeRecord(table, Arrays.asList(record1, record2), null, 0);
+        table.refresh();
+        Snapshot snapshot = table.currentSnapshot();
+
+        LakeSource<IcebergSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<IcebergSplit> plan = 
lakeSource.createPlanner(snapshot::snapshotId).plan();
+
+        IcebergSplit originalIcebergSplit = plan.get(0);
+        byte[] serialized = serializer.serialize(originalIcebergSplit);
+        IcebergSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+        assertThat(deserialized.fileScanTask().file().location())
+                
.isEqualTo(originalIcebergSplit.fileScanTask().file().location());
+        
assertThat(deserialized.partition()).isEqualTo(originalIcebergSplit.partition());
+        
assertThat(deserialized.bucket()).isEqualTo(originalIcebergSplit.bucket());
+    }
+
+    @Test
+    void testDeserializeWithInvalidData() {
+        byte[] invalidData = "invalid".getBytes();
+        assertThatThrownBy(() -> serializer.deserialize(1, invalidData))
+                .isInstanceOf(IOException.class);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 2a216ddea..81bfb0bd0 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -80,6 +80,7 @@ import static 
org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
 import static org.apache.iceberg.expressions.Expressions.equal;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -461,4 +462,32 @@ public class FlinkIcebergTieringTestBase {
         writeRows(tablePath, rows, !tableDescriptor.hasPrimaryKey());
         return writtenRowsByPartition;
     }
+
+    protected void waitUntilBucketSynced(
+            TablePath tablePath, long tableId, int bucketCount, boolean 
isPartition) {
+        if (isPartition) {
+            Map<Long, String> partitionById = waitUntilPartitions(tablePath);
+            for (Long partitionId : partitionById.keySet()) {
+                for (int i = 0; i < bucketCount; i++) {
+                    TableBucket tableBucket = new TableBucket(tableId, 
partitionId, i);
+                    waitUntilBucketSynced(tableBucket);
+                }
+            }
+        } else {
+            for (int i = 0; i < bucketCount; i++) {
+                TableBucket tableBucket = new TableBucket(tableId, i);
+                waitUntilBucketSynced(tableBucket);
+            }
+        }
+    }
+
+    protected void waitUntilBucketSynced(TableBucket tb) {
+        waitUntil(
+                () -> {
+                    Replica replica = getLeaderReplica(tb);
+                    return replica.getLogTablet().getLakeTableSnapshotId() >= 
0;
+                },
+                Duration.ofMinutes(2),
+                "bucket " + tb + " not synced");
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
index 9d18e9d5c..084876475 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/utils/IcebergConversionsTest.java
@@ -19,7 +19,6 @@
 package org.apache.fluss.lake.iceberg.utils;
 
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.lake.iceberg.conf.IcebergConfiguration;
 import org.apache.fluss.metadata.TablePath;
 
 import org.apache.iceberg.PartitionKey;
@@ -35,10 +34,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
-import java.util.Map;
 
 import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
-import static org.apache.iceberg.CatalogUtil.buildIcebergCatalog;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -69,11 +66,7 @@ class IcebergConversionsTest {
         configuration.setString("warehouse", 
tempWarehouseDir.toURI().toString());
         configuration.setString("catalog-impl", 
"org.apache.iceberg.inmemory.InMemoryCatalog");
         configuration.setString("name", "fluss_test_catalog");
-
-        Map<String, String> icebergProps = configuration.toMap();
-        String catalogName = icebergProps.getOrDefault("name", 
"default_iceberg_catalog");
-        return buildIcebergCatalog(
-                catalogName, icebergProps, 
IcebergConfiguration.from(configuration).get());
+        return IcebergCatalogUtils.createIcebergCatalog(configuration);
     }
 
     private Table createIcebergTable(
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
index 67e760f57..979356fa2 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonLakeSource.java
@@ -49,6 +49,7 @@ import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
  * paimon table.
  */
 public class PaimonLakeSource implements LakeSource<PaimonSplit> {
+    private static final long serialVersionUID = 1L;
 
     private final Configuration paimonConfig;
     private final TablePath tablePath;
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 987bc0451..3cd682ab2 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -85,7 +85,7 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
 
         assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
 
-        // can database sync job
+        // cancel the tiering job
         jobClient.cancel().get();
 
         // write some log data again
@@ -155,7 +155,7 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         assertResultsIgnoreOrder(
                 actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
 
-        // can database sync job
+        // cancel the tiering job
         jobClient.cancel().get();
 
         // write some log data again
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index f2baea535..df13bb696 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -388,6 +388,7 @@
                                         
<exclude>org.apache.fluss.lake.iceberg.*</exclude>
                                         
<exclude>org.apache.fluss.row.encode.iceberg.*</exclude>
                                         
<exclude>org.apache.fluss.bucketing.IcebergBucketingFunction</exclude>
+                                        
<exclude>org.apache.iceberg.transforms.TransformUtils</exclude>
                                         <!-- start exclude for flink tiering 
service -->
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>

Reply via email to