This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new fb6f6d59f [paimon] Paimon implement union read interfaces (#1505)
fb6f6d59f is described below

commit fb6f6d59f38fbf165b8a5553c067fa66d28407c4
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Aug 11 17:23:20 2025 +0800

    [paimon] Paimon implement union read interfaces (#1505)
---
 .../fluss/lake/paimon/PaimonLakeStorage.java       |   6 +-
 .../FlussRowAsPaimonRow.java}                      |  85 ++----
 .../fluss/lake/paimon/source/PaimonLakeSource.java | 110 ++++++++
 .../lake/paimon/source/PaimonRecordReader.java     | 152 +++++++++++
 .../paimon/source/PaimonSortedRecordReader.java    |  64 +++++
 .../fluss/lake/paimon/source/PaimonSplit.java      |  64 +++++
 .../lake/paimon/source/PaimonSplitPlanner.java     |  96 +++++++
 .../lake/paimon/source/PaimonSplitSerializer.java  |  59 ++++
 .../paimon/tiering/FlussRecordAsPaimonRow.java     | 126 +--------
 .../fluss/lake/paimon/utils/PaimonConversions.java |  15 +
 .../lake/paimon/utils/PaimonRowAsFlussRow.java     | 141 ++++++++++
 .../lake/paimon/flink/FlinkUnionReadTestBase.java  |   3 +-
 .../paimon/source/FlussRowAsPaimonRowTest.java     | 143 ++++++++++
 .../lake/paimon/source/PaimonRecordReaderTest.java | 303 +++++++++++++++++++++
 .../source/PaimonSortedRecordReaderTest.java       | 212 ++++++++++++++
 .../lake/paimon/source/PaimonSourceTestBase.java   | 138 ++++++++++
 .../lake/paimon/source/PaimonSplitPlannerTest.java |  84 ++++++
 .../paimon/source/PaimonSplitSerializerTest.java   |  86 ++++++
 .../fluss/lake/paimon/source/PaimonSplitTest.java  |  83 ++++++
 .../testutils/FlinkPaimonTieringTestBase.java      |   2 +-
 20 files changed, 1789 insertions(+), 183 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
index 03da696a0..f3eac859a 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeStorage.java
@@ -19,6 +19,8 @@ package com.alibaba.fluss.lake.paimon;
 
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.lake.lakestorage.LakeStorage;
+import com.alibaba.fluss.lake.paimon.source.PaimonLakeSource;
+import com.alibaba.fluss.lake.paimon.source.PaimonSplit;
 import com.alibaba.fluss.lake.paimon.tiering.PaimonCommittable;
 import com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory;
 import com.alibaba.fluss.lake.paimon.tiering.PaimonWriteResult;
@@ -46,7 +48,7 @@ public class PaimonLakeStorage implements LakeStorage {
     }
 
     @Override
-    public LakeSource<?> createLakeSource(TablePath tablePath) {
-        throw new UnsupportedOperationException("Not implemented");
+    public LakeSource<PaimonSplit> createLakeSource(TablePath tablePath) {
+        return new PaimonLakeSource(paimonConfig, tablePath);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java
similarity index 61%
copy from 
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
copy to 
fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java
index 97d543148..ddcc5b8c3 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRow.java
@@ -1,12 +1,13 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,9 +16,8 @@
  * limitations under the License.
  */
 
-package com.alibaba.fluss.lake.paimon.tiering;
+package com.alibaba.fluss.lake.paimon.source;
 
-import com.alibaba.fluss.record.LogRecord;
 import com.alibaba.fluss.row.TimestampLtz;
 import com.alibaba.fluss.row.TimestampNtz;
 
@@ -32,45 +32,30 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
-import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
-import static com.alibaba.fluss.utils.Preconditions.checkState;
+/** Adapter class for converting Fluss row to Paimon row. */
+public class FlussRowAsPaimonRow implements InternalRow {
 
-/** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
-public class FlussRecordAsPaimonRow implements InternalRow {
+    protected com.alibaba.fluss.row.InternalRow internalRow;
+    protected final RowType tableRowType;
 
-    // Lake table for paimon will append three system columns: __bucket, 
__offset,__timestamp
-    private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
-    private final RowType tableTowType;
-    private final int bucket;
-    private LogRecord logRecord;
-    private int originRowFieldCount;
-    private com.alibaba.fluss.row.InternalRow internalRow;
-
-    public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
-        this.bucket = bucket;
-        this.tableTowType = tableTowType;
+    public FlussRowAsPaimonRow(RowType tableTowType) {
+        this.tableRowType = tableTowType;
     }
 
-    public void setFlussRecord(LogRecord logRecord) {
-        this.logRecord = logRecord;
-        this.internalRow = logRecord.getRow();
-        this.originRowFieldCount = internalRow.getFieldCount();
-        checkState(
-                originRowFieldCount == tableTowType.getFieldCount() - 
LAKE_PAIMON_SYSTEM_COLUMNS,
-                "The paimon table fields count must equals to LogRecord's 
fields count.");
+    public FlussRowAsPaimonRow(
+            com.alibaba.fluss.row.InternalRow internalRow, RowType 
tableTowType) {
+        this.internalRow = internalRow;
+        this.tableRowType = tableTowType;
     }
 
     @Override
     public int getFieldCount() {
-        return
-        //  business (including partitions) + system (three system fields: 
bucket, offset,
-        // timestamp)
-        originRowFieldCount + LAKE_PAIMON_SYSTEM_COLUMNS;
+        return internalRow.getFieldCount();
     }
 
     @Override
     public RowKind getRowKind() {
-        return toRowKind(logRecord.getChangeType());
+        return RowKind.INSERT;
     }
 
     @Override
@@ -80,11 +65,7 @@ public class FlussRecordAsPaimonRow implements InternalRow {
 
     @Override
     public boolean isNullAt(int pos) {
-        if (pos < originRowFieldCount) {
-            return internalRow.isNullAt(pos);
-        }
-        // is the last three system fields: bucket, offset, timestamp which 
are never null
-        return false;
+        return internalRow.isNullAt(pos);
     }
 
     @Override
@@ -104,23 +85,11 @@ public class FlussRecordAsPaimonRow implements InternalRow 
{
 
     @Override
     public int getInt(int pos) {
-        if (pos == originRowFieldCount) {
-            // bucket system column
-            return bucket;
-        }
         return internalRow.getInt(pos);
     }
 
     @Override
     public long getLong(int pos) {
-        if (pos == originRowFieldCount + 1) {
-            //  offset system column
-            return logRecord.logOffset();
-        } else if (pos == originRowFieldCount + 2) {
-            //  timestamp system column
-            return logRecord.timestamp();
-        }
-        //  the origin RowData
         return internalRow.getLong(pos);
     }
 
@@ -151,13 +120,7 @@ public class FlussRecordAsPaimonRow implements InternalRow 
{
 
     @Override
     public Timestamp getTimestamp(int pos, int precision) {
-        // it's timestamp system column
-        if (pos == originRowFieldCount + 2) {
-            return Timestamp.fromEpochMillis(logRecord.timestamp());
-        }
-
-        DataType paimonTimestampType = tableTowType.getTypeAt(pos);
-
+        DataType paimonTimestampType = tableRowType.getTypeAt(pos);
         switch (paimonTimestampType.getTypeRoot()) {
             case TIMESTAMP_WITHOUT_TIME_ZONE:
                 if (TimestampNtz.isCompact(precision)) {
@@ -191,7 +154,7 @@ public class FlussRecordAsPaimonRow implements InternalRow {
     }
 
     @Override
-    public Variant getVariant(int pos) {
+    public Variant getVariant(int i) {
         throw new UnsupportedOperationException(
                 "getVariant is not support for Fluss record currently.");
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
new file mode 100644
index 000000000..a41f5a2fc
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonLakeSource.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.Planner;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.predicate.Predicate;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/**
+ * Paimon Lake format implementation of {@link 
com.alibaba.fluss.lake.source.LakeSource} for reading
+ * paimon table.
+ */
+public class PaimonLakeSource implements LakeSource<PaimonSplit> {
+
+    private final Configuration paimonConfig;
+    private final TablePath tablePath;
+
+    private @Nullable int[][] project;
+    private @Nullable org.apache.paimon.predicate.Predicate predicate;
+
+    public PaimonLakeSource(Configuration paimonConfig, TablePath tablePath) {
+        this.paimonConfig = paimonConfig;
+        this.tablePath = tablePath;
+    }
+
+    @Override
+    public void withProject(int[][] project) {
+        this.project = project;
+    }
+
+    @Override
+    public void withLimit(int limit) {
+        throw new UnsupportedOperationException("Not impl.");
+    }
+
+    @Override
+    public FilterPushDownResult withFilters(List<Predicate> predicates) {
+        return FilterPushDownResult.of(Collections.emptyList(), predicates);
+    }
+
+    @Override
+    public Planner<PaimonSplit> createPlanner(PlannerContext plannerContext) {
+        return new PaimonSplitPlanner(
+                paimonConfig, tablePath, predicate, 
plannerContext.snapshotId());
+    }
+
+    @Override
+    public RecordReader createRecordReader(ReaderContext<PaimonSplit> context) 
throws IOException {
+        try (Catalog catalog = getCatalog()) {
+            FileStoreTable fileStoreTable = getTable(catalog, tablePath);
+            if (fileStoreTable.primaryKeys().isEmpty()) {
+                return new PaimonRecordReader(
+                        fileStoreTable, context.lakeSplit(), project, 
predicate);
+            } else {
+                return new PaimonSortedRecordReader(
+                        fileStoreTable, context.lakeSplit(), project, 
predicate);
+            }
+        } catch (Exception e) {
+            throw new IOException("Fail to create record reader.", e);
+        }
+    }
+
+    @Override
+    public SimpleVersionedSerializer<PaimonSplit> getSplitSerializer() {
+        return new PaimonSplitSerializer();
+    }
+
+    private Catalog getCatalog() {
+        return CatalogFactory.createCatalog(
+                CatalogContext.create(Options.fromMap(paimonConfig.toMap())));
+    }
+
+    private FileStoreTable getTable(Catalog catalog, TablePath tablePath) 
throws Exception {
+        return (FileStoreTable) catalog.getTable(toPaimon(tablePath));
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
new file mode 100644
index 000000000..1ace3dfcd
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReader.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.IntStream;
+
+import static 
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toChangeType;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+
+/** Record reader for paimon table. */
+public class PaimonRecordReader implements RecordReader {
+
+    protected PaimonRowAsFlussRecordIterator iterator;
+    protected @Nullable int[][] project;
+    protected @Nullable Predicate predicate;
+    protected RowType paimonRowType;
+
+    public PaimonRecordReader(
+            FileStoreTable fileStoreTable,
+            PaimonSplit split,
+            @Nullable int[][] project,
+            @Nullable Predicate predicate)
+            throws IOException {
+        ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+        RowType paimonFullRowType = fileStoreTable.rowType();
+        if (project != null) {
+            readBuilder = applyProject(readBuilder, project, 
paimonFullRowType);
+        }
+
+        if (predicate != null) {
+            readBuilder.withFilter(predicate);
+        }
+
+        TableRead tableRead = readBuilder.newRead();
+        paimonRowType = readBuilder.readType();
+
+        org.apache.paimon.reader.RecordReader<InternalRow> recordReader =
+                tableRead.createReader(split.dataSplit());
+        iterator =
+                new PaimonRecordReader.PaimonRowAsFlussRecordIterator(
+                        recordReader.toCloseableIterator(), paimonRowType);
+    }
+
+    @Override
+    public CloseableIterator<LogRecord> read() throws IOException {
+        return iterator;
+    }
+
+    private ReadBuilder applyProject(
+            ReadBuilder readBuilder, int[][] projects, RowType 
paimonFullRowType) {
+        int[] projectIds = Arrays.stream(projects).mapToInt(project -> 
project[0]).toArray();
+
+        int offsetFieldPos = 
paimonFullRowType.getFieldIndex(OFFSET_COLUMN_NAME);
+        int timestampFieldPos = 
paimonFullRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);
+
+        int[] paimonProject =
+                IntStream.concat(
+                                IntStream.of(projectIds),
+                                IntStream.of(offsetFieldPos, 
timestampFieldPos))
+                        .toArray();
+
+        return readBuilder.withProjection(paimonProject);
+    }
+
+    /** Iterator for paimon row as fluss record. */
+    public static class PaimonRowAsFlussRecordIterator implements 
CloseableIterator<LogRecord> {
+
+        private final org.apache.paimon.utils.CloseableIterator<InternalRow> 
paimonRowIterator;
+
+        private final ProjectedRow projectedRow;
+        private final PaimonRowAsFlussRow paimonRowAsFlussRow;
+
+        private final int logOffsetColIndex;
+        private final int timestampColIndex;
+
+        public PaimonRowAsFlussRecordIterator(
+                org.apache.paimon.utils.CloseableIterator<InternalRow> 
paimonRowIterator,
+                RowType paimonRowType) {
+            this.paimonRowIterator = paimonRowIterator;
+            this.logOffsetColIndex = 
paimonRowType.getFieldIndex(OFFSET_COLUMN_NAME);
+            this.timestampColIndex = 
paimonRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);
+
+            int[] project = IntStream.range(0, paimonRowType.getFieldCount() - 
2).toArray();
+            projectedRow = ProjectedRow.from(project);
+            paimonRowAsFlussRow = new PaimonRowAsFlussRow();
+        }
+
+        @Override
+        public void close() {
+            try {
+                paimonRowIterator.close();
+            } catch (Exception e) {
+                throw new RuntimeException("Fail to close iterator.", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return paimonRowIterator.hasNext();
+        }
+
+        @Override
+        public LogRecord next() {
+            InternalRow paimonRow = paimonRowIterator.next();
+            ChangeType changeType = toChangeType(paimonRow.getRowKind());
+            long offset = paimonRow.getLong(logOffsetColIndex);
+            long timestamp = paimonRow.getTimestamp(timestampColIndex, 
6).getMillisecond();
+
+            return new GenericRecord(
+                    offset,
+                    timestamp,
+                    changeType,
+                    
projectedRow.replaceRow(paimonRowAsFlussRow.replaceRow(paimonRow)));
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java
new file mode 100644
index 000000000..281695105
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.row.InternalRow;
+
+import org.apache.paimon.KeyValueFileStore;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+/** Sorted record reader for primary key paimon table. */
+public class PaimonSortedRecordReader extends PaimonRecordReader implements 
SortedRecordReader {
+
+    Comparator<com.alibaba.fluss.row.InternalRow> comparator;
+
+    public PaimonSortedRecordReader(
+            FileStoreTable fileStoreTable,
+            PaimonSplit split,
+            @Nullable int[][] project,
+            @Nullable Predicate predicate)
+            throws IOException {
+        super(fileStoreTable, split, project, predicate);
+        this.comparator =
+                toFlussRowComparator(
+                        paimonRowType,
+                        ((KeyValueFileStore) 
fileStoreTable.store()).newKeyComparator());
+    }
+
+    @Override
+    public Comparator<InternalRow> order() {
+        return comparator;
+    }
+
+    private Comparator<com.alibaba.fluss.row.InternalRow> toFlussRowComparator(
+            RowType rowType, Comparator<org.apache.paimon.data.InternalRow> 
paimonRowcomparator) {
+        return (row1, row2) ->
+                paimonRowcomparator.compare(
+                        new FlussRowAsPaimonRow(row1, rowType),
+                        new FlussRowAsPaimonRow(row2, rowType));
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java
new file mode 100644
index 000000000..b28f8b61c
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplit.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSplit;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.source.DataSplit;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** Split for paimon table. */
+public class PaimonSplit implements LakeSplit {
+
+    private final DataSplit dataSplit;
+
+    public PaimonSplit(DataSplit dataSplit) {
+        this.dataSplit = dataSplit;
+    }
+
+    @Override
+    public int bucket() {
+        return dataSplit.bucket();
+    }
+
+    @Override
+    public List<String> partition() {
+        BinaryRow partition = dataSplit.partition();
+        if (partition.getFieldCount() == 0) {
+            return Collections.emptyList();
+        }
+
+        List<String> partitions = new ArrayList<>();
+        for (int i = 0; i < partition.getFieldCount(); i++) {
+            // Todo Currently, partition column must be String datatype, so we 
can always use
+            // consider it as string. Revisit here when
+            // #489 is finished.
+            partitions.add(partition.getString(i).toString());
+        }
+        return partitions;
+    }
+
+    public DataSplit dataSplit() {
+        return dataSplit;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
new file mode 100644
index 000000000..e481d8102
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlanner.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.source.Planner;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.InnerTableScan;
+import org.apache.paimon.table.source.Split;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Split panner for paimon table. */
+public class PaimonSplitPlanner implements Planner<PaimonSplit> {
+
+    private final Configuration paimonConfig;
+    private final TablePath tablePath;
+    private final @Nullable Predicate predicate;
+    private final long snapshotId;
+
+    public PaimonSplitPlanner(
+            Configuration paimonConfig,
+            TablePath tablePath,
+            @Nullable Predicate predicate,
+            long snapshotId) {
+        this.paimonConfig = paimonConfig;
+        this.tablePath = tablePath;
+        this.predicate = predicate;
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public List<PaimonSplit> plan() {
+        try {
+            List<PaimonSplit> splits = new ArrayList<>();
+            try (Catalog catalog = getCatalog()) {
+                FileStoreTable fileStoreTable = getTable(catalog, tablePath, 
snapshotId);
+                // TODO: support filter .withFilter(predicate)
+                InnerTableScan tableScan = fileStoreTable.newScan();
+                for (Split split : tableScan.plan().splits()) {
+                    DataSplit dataSplit = (DataSplit) split;
+                    splits.add(new PaimonSplit(dataSplit));
+                }
+            }
+            return splits;
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to plan splits for paimon.", e);
+        }
+    }
+
+    private Catalog getCatalog() {
+        return CatalogFactory.createCatalog(
+                CatalogContext.create(Options.fromMap(paimonConfig.toMap())));
+    }
+
+    private FileStoreTable getTable(Catalog catalog, TablePath tablePath, long 
snapshotId)
+            throws Exception {
+        return (FileStoreTable)
+                catalog.getTable(toPaimon(tablePath))
+                        .copy(
+                                Collections.singletonMap(
+                                        CoreOptions.SCAN_SNAPSHOT_ID.key(),
+                                        String.valueOf(snapshotId)));
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java
new file mode 100644
index 000000000..112a8a3f2
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+
+import org.apache.paimon.io.DataOutputViewStreamWrapper;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+/** Serializer for paimon split. */
+public class PaimonSplitSerializer implements 
SimpleVersionedSerializer<PaimonSplit> {
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(PaimonSplit paimonSplit) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        DataOutputViewStreamWrapper view = new 
DataOutputViewStreamWrapper(out);
+        DataSplit dataSplit = paimonSplit.dataSplit();
+        InstantiationUtil.serializeObject(view, dataSplit);
+        return out.toByteArray();
+    }
+
+    @Override
+    public PaimonSplit deserialize(int version, byte[] serialized) throws 
IOException {
+        ByteArrayInputStream in = new ByteArrayInputStream(serialized);
+        DataSplit dataSplit;
+        try {
+            dataSplit = InstantiationUtil.deserializeObject(in, 
getClass().getClassLoader());
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        }
+        return new PaimonSplit(dataSplit);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
index 97d543148..5edeb482a 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java
@@ -17,18 +17,11 @@
 
 package com.alibaba.fluss.lake.paimon.tiering;
 
+import com.alibaba.fluss.lake.paimon.source.FlussRowAsPaimonRow;
 import com.alibaba.fluss.record.LogRecord;
-import com.alibaba.fluss.row.TimestampLtz;
-import com.alibaba.fluss.row.TimestampNtz;
 
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.variant.Variant;
-import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
 
@@ -36,19 +29,17 @@ import static 
com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toRowKind;
 import static com.alibaba.fluss.utils.Preconditions.checkState;
 
 /** To wrap Fluss {@link LogRecord} as paimon {@link InternalRow}. */
-public class FlussRecordAsPaimonRow implements InternalRow {
+public class FlussRecordAsPaimonRow extends FlussRowAsPaimonRow {
 
     // Lake table for paimon will append three system columns: __bucket, 
__offset,__timestamp
     private static final int LAKE_PAIMON_SYSTEM_COLUMNS = 3;
-    private final RowType tableTowType;
     private final int bucket;
     private LogRecord logRecord;
     private int originRowFieldCount;
-    private com.alibaba.fluss.row.InternalRow internalRow;
 
     public FlussRecordAsPaimonRow(int bucket, RowType tableTowType) {
+        super(tableTowType);
         this.bucket = bucket;
-        this.tableTowType = tableTowType;
     }
 
     public void setFlussRecord(LogRecord logRecord) {
@@ -56,7 +47,7 @@ public class FlussRecordAsPaimonRow implements InternalRow {
         this.internalRow = logRecord.getRow();
         this.originRowFieldCount = internalRow.getFieldCount();
         checkState(
-                originRowFieldCount == tableTowType.getFieldCount() - 
LAKE_PAIMON_SYSTEM_COLUMNS,
+                originRowFieldCount == tableRowType.getFieldCount() - 
LAKE_PAIMON_SYSTEM_COLUMNS,
                 "The paimon table fields count must equals to LogRecord's 
fields count.");
     }
 
@@ -73,42 +64,22 @@ public class FlussRecordAsPaimonRow implements InternalRow {
         return toRowKind(logRecord.getChangeType());
     }
 
-    @Override
-    public void setRowKind(RowKind rowKind) {
-        // do nothing
-    }
-
     @Override
     public boolean isNullAt(int pos) {
         if (pos < originRowFieldCount) {
-            return internalRow.isNullAt(pos);
+            return super.isNullAt(pos);
         }
         // is the last three system fields: bucket, offset, timestamp which 
are never null
         return false;
     }
 
-    @Override
-    public boolean getBoolean(int pos) {
-        return internalRow.getBoolean(pos);
-    }
-
-    @Override
-    public byte getByte(int pos) {
-        return internalRow.getByte(pos);
-    }
-
-    @Override
-    public short getShort(int pos) {
-        return internalRow.getShort(pos);
-    }
-
     @Override
     public int getInt(int pos) {
         if (pos == originRowFieldCount) {
             // bucket system column
             return bucket;
         }
-        return internalRow.getInt(pos);
+        return super.getInt(pos);
     }
 
     @Override
@@ -121,32 +92,7 @@ public class FlussRecordAsPaimonRow implements InternalRow {
             return logRecord.timestamp();
         }
         //  the origin RowData
-        return internalRow.getLong(pos);
-    }
-
-    @Override
-    public float getFloat(int pos) {
-        return internalRow.getFloat(pos);
-    }
-
-    @Override
-    public double getDouble(int pos) {
-        return internalRow.getDouble(pos);
-    }
-
-    @Override
-    public BinaryString getString(int pos) {
-        return BinaryString.fromBytes(internalRow.getString(pos).toBytes());
-    }
-
-    @Override
-    public Decimal getDecimal(int pos, int precision, int scale) {
-        com.alibaba.fluss.row.Decimal flussDecimal = 
internalRow.getDecimal(pos, precision, scale);
-        if (flussDecimal.isCompact()) {
-            return Decimal.fromUnscaledLong(flussDecimal.toUnscaledLong(), 
precision, scale);
-        } else {
-            return Decimal.fromBigDecimal(flussDecimal.toBigDecimal(), 
precision, scale);
-        }
+        return super.getLong(pos);
     }
 
     @Override
@@ -155,62 +101,6 @@ public class FlussRecordAsPaimonRow implements InternalRow 
{
         if (pos == originRowFieldCount + 2) {
             return Timestamp.fromEpochMillis(logRecord.timestamp());
         }
-
-        DataType paimonTimestampType = tableTowType.getTypeAt(pos);
-
-        switch (paimonTimestampType.getTypeRoot()) {
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                if (TimestampNtz.isCompact(precision)) {
-                    return Timestamp.fromEpochMillis(
-                            internalRow.getTimestampNtz(pos, 
precision).getMillisecond());
-                } else {
-                    TimestampNtz timestampNtz = 
internalRow.getTimestampNtz(pos, precision);
-                    return Timestamp.fromEpochMillis(
-                            timestampNtz.getMillisecond(), 
timestampNtz.getNanoOfMillisecond());
-                }
-
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                if (TimestampLtz.isCompact(precision)) {
-                    return Timestamp.fromEpochMillis(
-                            internalRow.getTimestampLtz(pos, 
precision).getEpochMillisecond());
-                } else {
-                    TimestampLtz timestampLtz = 
internalRow.getTimestampLtz(pos, precision);
-                    return Timestamp.fromEpochMillis(
-                            timestampLtz.getEpochMillisecond(),
-                            timestampLtz.getNanoOfMillisecond());
-                }
-            default:
-                throw new UnsupportedOperationException(
-                        "Unsupported data type to get timestamp: " + 
paimonTimestampType);
-        }
-    }
-
-    @Override
-    public byte[] getBinary(int pos) {
-        return internalRow.getBytes(pos);
-    }
-
-    @Override
-    public Variant getVariant(int pos) {
-        throw new UnsupportedOperationException(
-                "getVariant is not support for Fluss record currently.");
-    }
-
-    @Override
-    public InternalArray getArray(int pos) {
-        throw new UnsupportedOperationException(
-                "getArray is not support for Fluss record currently.");
-    }
-
-    @Override
-    public InternalMap getMap(int pos) {
-        throw new UnsupportedOperationException(
-                "getMap is not support for Fluss record currently.");
-    }
-
-    @Override
-    public InternalRow getRow(int pos, int pos1) {
-        throw new UnsupportedOperationException(
-                "getRow is not support for Fluss record currently.");
+        return super.getTimestamp(pos, precision);
     }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
index 437cf1913..9d51e787b 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonConversions.java
@@ -50,6 +50,21 @@ public class PaimonConversions {
         }
     }
 
+    public static ChangeType toChangeType(RowKind rowKind) {
+        switch (rowKind) {
+            case INSERT:
+                return ChangeType.INSERT;
+            case UPDATE_BEFORE:
+                return ChangeType.UPDATE_BEFORE;
+            case UPDATE_AFTER:
+                return ChangeType.UPDATE_AFTER;
+            case DELETE:
+                return ChangeType.DELETE;
+            default:
+                throw new IllegalArgumentException("Unsupported rowKind: " + 
rowKind);
+        }
+    }
+
     public static Identifier toPaimon(TablePath tablePath) {
         return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
new file mode 100644
index 000000000..3fc625501
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/utils/PaimonRowAsFlussRow.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.utils;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+
+import org.apache.paimon.data.Timestamp;
+
+/** Adapter for paimon row as fluss row. */
+public class PaimonRowAsFlussRow implements InternalRow {
+
+    private org.apache.paimon.data.InternalRow paimonRow;
+
+    public PaimonRowAsFlussRow() {}
+
+    public PaimonRowAsFlussRow(org.apache.paimon.data.InternalRow paimonRow) {
+        this.paimonRow = paimonRow;
+    }
+
+    public PaimonRowAsFlussRow replaceRow(org.apache.paimon.data.InternalRow 
paimonRow) {
+        this.paimonRow = paimonRow;
+        return this;
+    }
+
+    @Override
+    public int getFieldCount() {
+        return paimonRow.getFieldCount();
+    }
+
+    @Override
+    public boolean isNullAt(int pos) {
+        return paimonRow.isNullAt(pos);
+    }
+
+    @Override
+    public boolean getBoolean(int pos) {
+        return paimonRow.getBoolean(pos);
+    }
+
+    @Override
+    public byte getByte(int pos) {
+        return paimonRow.getByte(pos);
+    }
+
+    @Override
+    public short getShort(int pos) {
+        return paimonRow.getShort(pos);
+    }
+
+    @Override
+    public int getInt(int pos) {
+        return paimonRow.getInt(pos);
+    }
+
+    @Override
+    public long getLong(int pos) {
+        return paimonRow.getLong(pos);
+    }
+
+    @Override
+    public float getFloat(int pos) {
+        return paimonRow.getFloat(pos);
+    }
+
+    @Override
+    public double getDouble(int pos) {
+        return paimonRow.getDouble(pos);
+    }
+
+    @Override
+    public BinaryString getChar(int pos, int length) {
+        return BinaryString.fromBytes(paimonRow.getString(pos).toBytes());
+    }
+
+    @Override
+    public BinaryString getString(int pos) {
+        return BinaryString.fromBytes(paimonRow.getString(pos).toBytes());
+    }
+
+    @Override
+    public Decimal getDecimal(int pos, int precision, int scale) {
+        org.apache.paimon.data.Decimal paimonDecimal = 
paimonRow.getDecimal(pos, precision, scale);
+        if (paimonDecimal.isCompact()) {
+            return Decimal.fromUnscaledLong(paimonDecimal.toUnscaledLong(), 
precision, scale);
+        } else {
+            return Decimal.fromBigDecimal(paimonDecimal.toBigDecimal(), 
precision, scale);
+        }
+    }
+
+    @Override
+    public TimestampNtz getTimestampNtz(int pos, int precision) {
+        Timestamp timestamp = paimonRow.getTimestamp(pos, precision);
+        if (TimestampNtz.isCompact(precision)) {
+            return TimestampNtz.fromMillis(timestamp.getMillisecond());
+        } else {
+            return TimestampNtz.fromMillis(
+                    timestamp.getMillisecond(), 
timestamp.getNanoOfMillisecond());
+        }
+    }
+
+    @Override
+    public TimestampLtz getTimestampLtz(int pos, int precision) {
+        Timestamp timestamp = paimonRow.getTimestamp(pos, precision);
+        if (TimestampLtz.isCompact(precision)) {
+            return TimestampLtz.fromEpochMillis(timestamp.getMillisecond());
+        } else {
+            return TimestampLtz.fromEpochMillis(
+                    timestamp.getMillisecond(), 
timestamp.getNanoOfMillisecond());
+        }
+    }
+
+    @Override
+    public byte[] getBinary(int pos, int length) {
+        return paimonRow.getBinary(pos);
+    }
+
+    @Override
+    public byte[] getBytes(int pos) {
+        return paimonRow.getBinary(pos);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 609bf68ac..3231fe644 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -27,7 +27,8 @@ import org.junit.jupiter.api.BeforeEach;
 
 import static com.alibaba.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
 
-class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase {
+/** Base class for Flink union read test. */
+public class FlinkUnionReadTestBase extends FlinkPaimonTieringTestBase {
 
     protected static final int DEFAULT_BUCKET_NUM = 1;
     StreamTableEnvironment batchTEnv;
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java
new file mode 100644
index 000000000..b20e1a5f2
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/FlussRowAsPaimonRowTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.record.GenericRecord;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+
+import static com.alibaba.fluss.record.ChangeType.APPEND_ONLY;
+import static com.alibaba.fluss.record.ChangeType.DELETE;
+import static com.alibaba.fluss.record.ChangeType.INSERT;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_AFTER;
+import static com.alibaba.fluss.record.ChangeType.UPDATE_BEFORE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link FlussRowAsPaimonRow}. */
+class FlussRowAsPaimonRowTest {
+    @Test
+    void testLogTableRecordAllTypes() {
+        // Construct a FlussRowAsPaimonRow instance
+        RowType tableRowType =
+                RowType.of(
+                        new org.apache.paimon.types.BooleanType(),
+                        new org.apache.paimon.types.TinyIntType(),
+                        new org.apache.paimon.types.SmallIntType(),
+                        new org.apache.paimon.types.IntType(),
+                        new org.apache.paimon.types.BigIntType(),
+                        new org.apache.paimon.types.FloatType(),
+                        new org.apache.paimon.types.DoubleType(),
+                        new org.apache.paimon.types.VarCharType(),
+                        new org.apache.paimon.types.DecimalType(5, 2),
+                        new org.apache.paimon.types.DecimalType(20, 0),
+                        new org.apache.paimon.types.LocalZonedTimestampType(6),
+                        new org.apache.paimon.types.TimestampType(6),
+                        new org.apache.paimon.types.BinaryType(),
+                        new org.apache.paimon.types.VarCharType());
+
+        long logOffset = 0;
+        long timeStamp = System.currentTimeMillis();
+        GenericRow genericRow = new GenericRow(14);
+        genericRow.setField(0, true);
+        genericRow.setField(1, (byte) 1);
+        genericRow.setField(2, (short) 2);
+        genericRow.setField(3, 3);
+        genericRow.setField(4, 4L);
+        genericRow.setField(5, 5.1f);
+        genericRow.setField(6, 6.0d);
+        genericRow.setField(7, BinaryString.fromString("string"));
+        genericRow.setField(8, Decimal.fromUnscaledLong(9, 5, 2));
+        genericRow.setField(9, Decimal.fromBigDecimal(new BigDecimal(10), 20, 
0));
+        genericRow.setField(10, TimestampLtz.fromEpochMillis(1698235273182L, 
5678));
+        genericRow.setField(11, TimestampNtz.fromMillis(1698235273182L, 5678));
+        genericRow.setField(12, new byte[] {1, 2, 3, 4});
+        genericRow.setField(13, null);
+        LogRecord logRecord = new GenericRecord(logOffset, timeStamp, 
APPEND_ONLY, genericRow);
+        FlussRowAsPaimonRow flussRowAsPaimonRow =
+                new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
+
+        // verify FlussRecordAsPaimonRow normal columns
+        assertThat(flussRowAsPaimonRow.getBoolean(0)).isTrue();
+        assertThat(flussRowAsPaimonRow.getByte(1)).isEqualTo((byte) 1);
+        assertThat(flussRowAsPaimonRow.getShort(2)).isEqualTo((short) 2);
+        assertThat(flussRowAsPaimonRow.getInt(3)).isEqualTo(3);
+        assertThat(flussRowAsPaimonRow.getLong(4)).isEqualTo(4L);
+        assertThat(flussRowAsPaimonRow.getFloat(5)).isEqualTo(5.1f);
+        assertThat(flussRowAsPaimonRow.getDouble(6)).isEqualTo(6.0d);
+        
assertThat(flussRowAsPaimonRow.getString(7).toString()).isEqualTo("string");
+        assertThat(flussRowAsPaimonRow.getDecimal(8, 5, 2).toBigDecimal())
+                .isEqualTo(new BigDecimal("0.09"));
+        assertThat(flussRowAsPaimonRow.getDecimal(9, 20, 0).toBigDecimal())
+                .isEqualTo(new BigDecimal(10));
+        assertThat(flussRowAsPaimonRow.getTimestamp(10, 6).getMillisecond())
+                .isEqualTo(1698235273182L);
+        assertThat(flussRowAsPaimonRow.getTimestamp(10, 
6).getNanoOfMillisecond()).isEqualTo(5678);
+        assertThat(flussRowAsPaimonRow.getTimestamp(11, 6).getMillisecond())
+                .isEqualTo(1698235273182L);
+        assertThat(flussRowAsPaimonRow.getTimestamp(11, 
6).getNanoOfMillisecond()).isEqualTo(5678);
+        assertThat(flussRowAsPaimonRow.getBinary(12)).isEqualTo(new byte[] {1, 
2, 3, 4});
+        assertThat(flussRowAsPaimonRow.isNullAt(13)).isTrue();
+    }
+
+    @Test
+    void testPrimaryKeyTableRecord() {
+        RowType tableRowType =
+                RowType.of(
+                        new org.apache.paimon.types.IntType(),
+                        new org.apache.paimon.types.BooleanType());
+
+        long logOffset = 0;
+        long timeStamp = System.currentTimeMillis();
+        GenericRow genericRow = new GenericRow(2);
+        genericRow.setField(0, 10);
+        genericRow.setField(1, true);
+
+        LogRecord logRecord = new GenericRecord(logOffset, timeStamp, INSERT, 
genericRow);
+        FlussRowAsPaimonRow flussRowAsPaimonRow =
+                new FlussRowAsPaimonRow(logRecord.getRow(), tableRowType);
+
+        assertThat(flussRowAsPaimonRow.getInt(0)).isEqualTo(10);
+        // verify rowkind
+        assertThat(flussRowAsPaimonRow.getRowKind()).isEqualTo(RowKind.INSERT);
+
+        logRecord = new GenericRecord(logOffset, timeStamp, UPDATE_BEFORE, 
genericRow);
+        assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), 
tableRowType).getRowKind())
+                .isEqualTo(RowKind.INSERT);
+
+        logRecord = new GenericRecord(logOffset, timeStamp, UPDATE_AFTER, 
genericRow);
+        assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), 
tableRowType).getRowKind())
+                .isEqualTo(RowKind.INSERT);
+
+        logRecord = new GenericRecord(logOffset, timeStamp, DELETE, 
genericRow);
+        assertThat(new FlussRowAsPaimonRow(logRecord.getRow(), 
tableRowType).getRowKind())
+                .isEqualTo(RowKind.INSERT);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
new file mode 100644
index 000000000..eb9b7a1ca
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonRecordReaderTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.paimon.utils.PaimonRowAsFlussRow;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonRecordReader}. */
+class PaimonRecordReaderTest extends PaimonSourceTestBase {
+    @BeforeAll
+    protected static void beforeAll() {
+        PaimonSourceTestBase.beforeAll();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testReadLogTable(boolean isPartitioned) throws Exception {
+        // first of all, create table and prepare data
+        String tableName = "logTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+        List<InternalRow> writtenRows = new ArrayList<>();
+        prepareLogTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM, 
writtenRows);
+
+        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        Table table = getTable(tablePath);
+        Snapshot snapshot = table.latestSnapshot().get();
+        List<PaimonSplit> paimonSplits = 
lakeSource.createPlanner(snapshot::id).plan();
+
+        List<Split> splits = ((FileStoreTable) 
table).newScan().plan().splits();
+        assertThat(splits).hasSize(paimonSplits.size());
+        assertThat(splits)
+                .isEqualTo(
+                        paimonSplits.stream()
+                                .map(PaimonSplit::dataSplit)
+                                .collect(Collectors.toList()));
+
+        List<Row> actual = new ArrayList<>();
+
+        InternalRow.FieldGetter[] fieldGetters =
+                InternalRow.createFieldGetters(getFlussRowType(isPartitioned));
+        for (PaimonSplit paimonSplit : paimonSplits) {
+            RecordReader recordReader = lakeSource.createRecordReader(() -> 
paimonSplit);
+            CloseableIterator<LogRecord> iterator = recordReader.read();
+            actual.addAll(
+                    convertToFlinkRow(
+                            fieldGetters,
+                            TransformingCloseableIterator.transform(iterator, 
LogRecord::getRow)));
+            iterator.close();
+        }
+        List<Row> expectRows =
+                convertToFlinkRow(fieldGetters, 
CloseableIterator.wrap(writtenRows.iterator()));
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows);
+    }
+
+    @Test
+    void testReadLogTableWithProject() throws Exception {
+        // first of all, create table and prepare data
+        String tableName = "logTable_non_partitioned";
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+        List<InternalRow> writtenRows = new ArrayList<>();
+        prepareLogTable(tablePath, false, DEFAULT_BUCKET_NUM, writtenRows);
+
+        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        lakeSource.withProject(new int[][] {new int[] {5}, new int[] {1}, new 
int[] {3}});
+        Table table = getTable(tablePath);
+        Snapshot snapshot = table.latestSnapshot().get();
+        List<PaimonSplit> paimonSplits = 
lakeSource.createPlanner(snapshot::id).plan();
+
+        List<Row> actual = new ArrayList<>();
+
+        InternalRow.FieldGetter[] fieldGetters =
+                InternalRow.createFieldGetters(
+                        RowType.of(new FloatType(), new TinyIntType(), new 
IntType()));
+        for (PaimonSplit paimonSplit : paimonSplits) {
+            RecordReader recordReader = lakeSource.createRecordReader(() -> 
paimonSplit);
+            CloseableIterator<LogRecord> iterator = recordReader.read();
+            actual.addAll(
+                    convertToFlinkRow(
+                            fieldGetters,
+                            TransformingCloseableIterator.transform(iterator, 
LogRecord::getRow)));
+            iterator.close();
+        }
+        List<Row> expectRows = new ArrayList<>();
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {5, 1, 3});
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        try 
(org.apache.paimon.reader.RecordReader<org.apache.paimon.data.InternalRow>
+                recordReader = readBuilder.newRead().createReader(splits)) {
+            
org.apache.paimon.utils.CloseableIterator<org.apache.paimon.data.InternalRow>
+                    closeableIterator = recordReader.toCloseableIterator();
+            PaimonRowAsFlussRow paimonRowAsFlussRow = new 
PaimonRowAsFlussRow();
+            expectRows.addAll(
+                    convertToFlinkRow(
+                            fieldGetters,
+                            TransformingCloseableIterator.transform(
+                                    CloseableIterator.wrap(closeableIterator),
+                                    paimonRowAsFlussRow::replaceRow)));
+        }
+        assertThat(actual).containsExactlyInAnyOrderElementsOf(expectRows);
+    }
+
+    private RowType getFlussRowType(boolean isPartitioned) {
+        return isPartitioned
+                ? RowType.of(
+                        new BooleanType(),
+                        new TinyIntType(),
+                        new SmallIntType(),
+                        new IntType(),
+                        new BigIntType(),
+                        new FloatType(),
+                        new DoubleType(),
+                        new StringType(),
+                        new DecimalType(5, 2),
+                        new DecimalType(20, 0),
+                        new LocalZonedTimestampType(3),
+                        new LocalZonedTimestampType(6),
+                        new TimestampType(3),
+                        new TimestampType(6),
+                        new BinaryType(4),
+                        new StringType())
+                : RowType.of(
+                        new BooleanType(),
+                        new TinyIntType(),
+                        new SmallIntType(),
+                        new IntType(),
+                        new BigIntType(),
+                        new FloatType(),
+                        new DoubleType(),
+                        new StringType(),
+                        new DecimalType(5, 2),
+                        new DecimalType(20, 0),
+                        new LocalZonedTimestampType(3),
+                        new LocalZonedTimestampType(6),
+                        new TimestampType(3),
+                        new TimestampType(6),
+                        new BinaryType(4));
+    }
+
+    private void prepareLogTable(
+            TablePath tablePath, boolean isPartitioned, int bucketNum, 
List<InternalRow> rows)
+            throws Exception {
+        createFullTypeLogTable(tablePath, isPartitioned, bucketNum);
+        rows.addAll(writeFullTypeRows(tablePath, 10, isPartitioned ? "test" : 
null));
+    }
+
+    private void createFullTypeLogTable(TablePath tablePath, boolean 
isPartitioned, int bucketNum)
+            throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("f_boolean", DataTypes.BOOLEAN())
+                        .column("f_byte", DataTypes.TINYINT())
+                        .column("f_short", DataTypes.SMALLINT())
+                        .column("f_int", DataTypes.INT())
+                        .column("f_long", DataTypes.BIGINT())
+                        .column("f_float", DataTypes.FLOAT())
+                        .column("f_double", DataTypes.DOUBLE())
+                        .column("f_string", DataTypes.STRING())
+                        .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+                        .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+                        .column("f_timestamp_ltz1", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+                        .column("f_timestamp_ltz2", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
+                        .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+                        .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+                        .column("f_binary", DataTypes.BINARY(4));
+
+        if (isPartitioned) {
+            schemaBuilder.column("p", DataTypes.STRING());
+            schemaBuilder.partitionKeys("p");
+            schemaBuilder.option(CoreOptions.BUCKET.key(), 
String.valueOf(bucketNum));
+            schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_long");
+        }
+        schemaBuilder
+                .column("__bucket", DataTypes.INT())
+                .column("__offset", DataTypes.BIGINT())
+                .column("__timestamp", DataTypes.TIMESTAMP(6));
+        createTable(tablePath, schemaBuilder.build());
+    }
+
+    private List<InternalRow> writeFullTypeRows(
+            TablePath tablePath, int rowCount, @Nullable String partition) 
throws Exception {
+        List<org.apache.paimon.data.InternalRow> rows = new ArrayList<>();
+        List<InternalRow> flussRows = new ArrayList<>();
+        Table table = getTable(tablePath);
+
+        for (int i = 0; i < rowCount; i++) {
+            if (partition == null) {
+                com.alibaba.fluss.row.GenericRow row =
+                        row(
+                                true,
+                                (byte) 100,
+                                (short) 200,
+                                30,
+                                i + 400L,
+                                500.1f,
+                                600.0d,
+                                com.alibaba.fluss.row.BinaryString.fromString(
+                                        "another_string_" + i),
+                                Decimal.fromUnscaledLong(900, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273400L),
+                                TimestampLtz.fromEpochMillis(1698235273400L, 
7000),
+                                TimestampNtz.fromMillis(1698235273501L),
+                                TimestampNtz.fromMillis(1698235273501L, 8000),
+                                new byte[] {5, 6, 7, 8},
+                                0,
+                                (long) i,
+                                
TimestampNtz.fromMillis(System.currentTimeMillis()));
+                rows.add(new FlussRowAsPaimonRow(row, table.rowType()));
+                flussRows.add(row);
+            } else {
+                com.alibaba.fluss.row.GenericRow row =
+                        row(
+                                true,
+                                (byte) 100,
+                                (short) 200,
+                                30,
+                                i + 400L,
+                                500.1f,
+                                600.0d,
+                                com.alibaba.fluss.row.BinaryString.fromString(
+                                        "another_string_" + i),
+                                Decimal.fromUnscaledLong(900, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273400L),
+                                TimestampLtz.fromEpochMillis(1698235273400L, 
7000),
+                                TimestampNtz.fromMillis(1698235273501L),
+                                TimestampNtz.fromMillis(1698235273501L, 8000),
+                                new byte[] {5, 6, 7, 8},
+                                
com.alibaba.fluss.row.BinaryString.fromString(partition),
+                                0,
+                                (long) i,
+                                
TimestampNtz.fromMillis(System.currentTimeMillis()));
+                rows.add(new FlussRowAsPaimonRow(row, table.rowType()));
+                flussRows.add(row);
+            }
+        }
+        writeRecord(tablePath, rows);
+        return flussRows;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
new file mode 100644
index 000000000..6efc8706c
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSortedRecordReaderTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSortedRecordReader}. */
+class PaimonSortedRecordReaderTest extends PaimonSourceTestBase {
+    @BeforeAll
+    protected static void beforeAll() {
+        PaimonSourceTestBase.beforeAll();
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testReadPkTable(boolean isPartitioned) throws Exception {
+        // first of all, create table and prepare data
+        String tableName = "pkTable_" + (isPartitioned ? "partitioned" : 
"non_partitioned");
+
+        TablePath tablePath = TablePath.of(DEFAULT_DB, tableName);
+
+        List<InternalRow> writtenRows = new ArrayList<>();
+        preparePkTable(tablePath, isPartitioned, DEFAULT_BUCKET_NUM, 
writtenRows);
+
+        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        Table table = getTable(tablePath);
+        Snapshot snapshot = table.latestSnapshot().get();
+        List<PaimonSplit> paimonSplits = 
lakeSource.createPlanner(snapshot::id).plan();
+
+        for (PaimonSplit paimonSplit : paimonSplits) {
+            RecordReader recordReader = lakeSource.createRecordReader(() -> 
paimonSplit);
+            
assertThat(recordReader).isInstanceOf(PaimonSortedRecordReader.class);
+            CloseableIterator<LogRecord> iterator = recordReader.read();
+            assertThat(
+                            isSorted(
+                                    TransformingCloseableIterator.transform(
+                                            iterator, LogRecord::getRow),
+                                    ((SortedRecordReader) 
recordReader).order()))
+                    .isTrue();
+            iterator.close();
+        }
+    }
+
+    private static <T> boolean isSorted(Iterator<T> iterator, Comparator<? 
super T> comparator) {
+        if (!iterator.hasNext()) {
+            return true;
+        }
+
+        T previous = iterator.next();
+        while (iterator.hasNext()) {
+            T current = iterator.next();
+            if (comparator.compare(previous, current) > 0) {
+                return false;
+            }
+            previous = current;
+        }
+        return true;
+    }
+
+    private void preparePkTable(
+            TablePath tablePath, boolean isPartitioned, int bucketNum, 
List<InternalRow> rows)
+            throws Exception {
+        createFullTypePkTable(tablePath, isPartitioned, bucketNum);
+        rows.addAll(writeFullTypeRows(tablePath, 10, isPartitioned ? "test" : 
null));
+    }
+
+    private void createFullTypePkTable(TablePath tablePath, boolean 
isPartitioned, int bucketNum)
+            throws Exception {
+        Schema.Builder schemaBuilder =
+                Schema.newBuilder()
+                        .column("f_int", DataTypes.INT())
+                        .column("f_boolean", DataTypes.BOOLEAN())
+                        .column("f_byte", DataTypes.TINYINT())
+                        .column("f_short", DataTypes.SMALLINT())
+                        .column("f_long", DataTypes.BIGINT())
+                        .column("f_float", DataTypes.FLOAT())
+                        .column("f_double", DataTypes.DOUBLE())
+                        .column("f_string", DataTypes.STRING())
+                        .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+                        .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+                        .column("f_timestamp_ltz1", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))
+                        .column("f_timestamp_ltz2", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
+                        .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+                        .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+                        .column("f_binary", DataTypes.BINARY(4));
+
+        if (isPartitioned) {
+            schemaBuilder.column("p", DataTypes.STRING());
+            schemaBuilder.partitionKeys("p");
+            schemaBuilder.primaryKey("f_int", "p");
+            schemaBuilder.option(CoreOptions.BUCKET.key(), 
String.valueOf(bucketNum));
+            schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_int");
+        } else {
+            schemaBuilder.primaryKey("f_int");
+            schemaBuilder.option(CoreOptions.BUCKET.key(), 
String.valueOf(bucketNum));
+            schemaBuilder.option(CoreOptions.BUCKET_KEY.key(), "f_int");
+        }
+        schemaBuilder
+                .column("__bucket", DataTypes.INT())
+                .column("__offset", DataTypes.BIGINT())
+                .column("__timestamp", DataTypes.TIMESTAMP(6));
+        createTable(tablePath, schemaBuilder.build());
+    }
+
+    private List<InternalRow> writeFullTypeRows(
+            TablePath tablePath, int rowCount, @Nullable String partition) 
throws Exception {
+        List<org.apache.paimon.data.InternalRow> rows = new ArrayList<>();
+        List<InternalRow> flussRows = new ArrayList<>();
+        Table table = getTable(tablePath);
+
+        for (int i = 0; i < rowCount; i++) {
+            if (partition == null) {
+                com.alibaba.fluss.row.GenericRow row =
+                        row(
+                                i + 30,
+                                true,
+                                (byte) 100,
+                                (short) 200,
+                                400L,
+                                500.1f,
+                                600.0d,
+                                com.alibaba.fluss.row.BinaryString.fromString(
+                                        "another_string_" + i),
+                                Decimal.fromUnscaledLong(900, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273400L),
+                                TimestampLtz.fromEpochMillis(1698235273400L, 
7000),
+                                TimestampNtz.fromMillis(1698235273501L),
+                                TimestampNtz.fromMillis(1698235273501L, 8000),
+                                new byte[] {5, 6, 7, 8},
+                                0,
+                                (long) i,
+                                
TimestampNtz.fromMillis(System.currentTimeMillis()));
+                rows.add(new FlussRowAsPaimonRow(row, table.rowType()));
+                flussRows.add(row);
+            } else {
+                com.alibaba.fluss.row.GenericRow row =
+                        row(
+                                i + 30,
+                                true,
+                                (byte) 100,
+                                (short) 200,
+                                400L,
+                                500.1f,
+                                600.0d,
+                                com.alibaba.fluss.row.BinaryString.fromString(
+                                        "another_string_" + i),
+                                Decimal.fromUnscaledLong(900, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273400L),
+                                TimestampLtz.fromEpochMillis(1698235273400L, 
7000),
+                                TimestampNtz.fromMillis(1698235273501L),
+                                TimestampNtz.fromMillis(1698235273501L, 8000),
+                                new byte[] {5, 6, 7, 8},
+                                
com.alibaba.fluss.row.BinaryString.fromString(partition),
+                                0,
+                                (long) i,
+                                
TimestampNtz.fromMillis(System.currentTimeMillis()));
+                rows.add(new FlussRowAsPaimonRow(row, table.rowType()));
+                flussRows.add(row);
+            }
+        }
+        writeRecord(tablePath, rows);
+        return flussRows;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
new file mode 100644
index 000000000..93c73d329
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSourceTestBase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.paimon.PaimonLakeStorage;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.apache.flink.types.Row;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Function;
+
+import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
+
+/** Base class for paimon lakehouse test. */
+class PaimonSourceTestBase {
+    protected static final String DEFAULT_DB = "fluss_lakehouse";
+    protected static final String DEFAULT_TABLE = "test_lakehouse_table";
+    protected static final int DEFAULT_BUCKET_NUM = 1;
+
+    private static @TempDir File tempWarehouseDir;
+    protected static PaimonLakeStorage lakeStorage;
+    protected static Catalog paimonCatalog;
+
+    @BeforeAll
+    protected static void beforeAll() {
+        Configuration configuration = new Configuration();
+        configuration.setString("type", "paimon");
+        configuration.setString("warehouse", tempWarehouseDir.toString());
+        lakeStorage = new PaimonLakeStorage(configuration);
+        paimonCatalog =
+                CatalogFactory.createCatalog(
+                        
CatalogContext.create(Options.fromMap(configuration.toMap())));
+    }
+
+    public void createTable(TablePath tablePath, Schema schema) throws 
Exception {
+        paimonCatalog.createDatabase(tablePath.getDatabaseName(), true);
+        paimonCatalog.createTable(toPaimon(tablePath), schema, true);
+    }
+
+    public void writeRecord(TablePath tablePath, List<InternalRow> records) 
throws Exception {
+        Table table = getTable(tablePath);
+        BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder().withOverwrite();
+        try (BatchTableWrite writer = writeBuilder.newWrite()) {
+            for (InternalRow record : records) {
+                writer.write(record);
+            }
+            List<CommitMessage> messages = writer.prepareCommit();
+            try (BatchTableCommit commit = writeBuilder.newCommit()) {
+                commit.commit(messages);
+            }
+        }
+    }
+
+    public Table getTable(TablePath tablePath) throws Exception {
+        return paimonCatalog.getTable(toPaimon(tablePath));
+    }
+
+    public static List<Row> convertToFlinkRow(
+            com.alibaba.fluss.row.InternalRow.FieldGetter[] fieldGetters,
+            CloseableIterator<com.alibaba.fluss.row.InternalRow> 
flussRowIterator) {
+        List<Row> rows = new ArrayList<>();
+        while (flussRowIterator.hasNext()) {
+            com.alibaba.fluss.row.InternalRow row = flussRowIterator.next();
+            Row flinkRow = new Row(fieldGetters.length);
+            for (int i = 0; i < fieldGetters.length; i++) {
+                flinkRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+            }
+            rows.add(flinkRow);
+        }
+        return rows;
+    }
+
+    /** Adapter for transforming closeable iterator. */
+    public static class TransformingCloseableIterator<T, U> implements 
CloseableIterator<U> {
+        private final CloseableIterator<T> source;
+        private final Function<? super T, ? extends U> transformer;
+
+        public TransformingCloseableIterator(
+                CloseableIterator<T> source, Function<? super T, ? extends U> 
transformer) {
+            this.source = source;
+            this.transformer = transformer;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return source.hasNext();
+        }
+
+        @Override
+        public U next() {
+            return transformer.apply(source.next());
+        }
+
+        @Override
+        public void close() {
+            source.close();
+        }
+
+        public static <T, U> CloseableIterator<U> transform(
+                CloseableIterator<T> source, Function<? super T, ? extends U> 
transformer) {
+            return new TransformingCloseableIterator<>(source, transformer);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
new file mode 100644
index 000000000..0d9e509fe
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitPlannerTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSplitPlanner}. */
+class PaimonSplitPlannerTest extends PaimonSourceTestBase {
+    @Test
+    void testPlan() throws Exception {
+        // prepare paimon table
+        int bucketNum = 2;
+        TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("c1", DataTypes.INT())
+                        .column("c2", DataTypes.STRING())
+                        .column("c3", DataTypes.STRING());
+        builder.partitionKeys("c3");
+        builder.primaryKey("c1", "c3");
+        builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
+        builder.option(CoreOptions.BUCKET_KEY.key(), "c1");
+        createTable(tablePath, builder.build());
+        Table table =
+                paimonCatalog.getTable(
+                        Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName()));
+
+        GenericRow record1 =
+                GenericRow.of(12, BinaryString.fromString("a"), 
BinaryString.fromString("A"));
+        GenericRow record2 =
+                GenericRow.of(13, BinaryString.fromString("a"), 
BinaryString.fromString("A"));
+        writeRecord(tablePath, Arrays.asList(record1, record2));
+        Snapshot snapshot = table.latestSnapshot().get();
+
+        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<PaimonSplit> paimonSplits = 
lakeSource.createPlanner(snapshot::id).plan();
+
+        List<Split> actualSplits = ((FileStoreTable) 
table).newScan().plan().splits();
+
+        assertThat(actualSplits).hasSize(paimonSplits.size());
+        assertThat(actualSplits)
+                .isEqualTo(
+                        paimonSplits.stream()
+                                .map(PaimonSplit::dataSplit)
+                                .collect(Collectors.toList()));
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
new file mode 100644
index 000000000..9f07509be
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitSerializerTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test case for {@link PaimonSplitSerializer}. */
+class PaimonSplitSerializerTest extends PaimonSourceTestBase {
+    private final PaimonSplitSerializer serializer = new 
PaimonSplitSerializer();
+
+    @Test
+    void testSerializeAndDeserialize() throws Exception {
+        // prepare paimon table
+        int bucketNum = 1;
+        TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("c1", DataTypes.INT())
+                        .column("c2", DataTypes.STRING())
+                        .column("c3", DataTypes.STRING());
+        builder.partitionKeys("c3");
+        builder.primaryKey("c1", "c3");
+        builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
+        createTable(tablePath, builder.build());
+        Table table =
+                paimonCatalog.getTable(
+                        Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName()));
+
+        GenericRow record1 =
+                GenericRow.of(12, BinaryString.fromString("a"), 
BinaryString.fromString("A"));
+        writeRecord(tablePath, Arrays.asList(record1));
+        Snapshot snapshot = table.latestSnapshot().get();
+
+        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<PaimonSplit> plan = lakeSource.createPlanner(snapshot::id).plan();
+
+        PaimonSplit originalPaimonSplit = plan.get(0);
+        byte[] serialized = serializer.serialize(originalPaimonSplit);
+        PaimonSplit deserialized = 
serializer.deserialize(serializer.getVersion(), serialized);
+
+        
assertThat(deserialized.dataSplit()).isEqualTo(originalPaimonSplit.dataSplit());
+    }
+
+    @Test
+    void testDeserializeWithInvalidData() {
+        byte[] invalidData = "invalid".getBytes();
+        assertThatThrownBy(() -> serializer.deserialize(1, invalidData))
+                .isInstanceOf(IOException.class);
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
new file mode 100644
index 000000000..b0139d1d8
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/source/PaimonSplitTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.alibaba.fluss.lake.paimon.source;
+
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.metadata.TablePath;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test case for {@link PaimonSplit}. */
+class PaimonSplitTest extends PaimonSourceTestBase {
+
+    @Test
+    void testPaimonSplit() throws Exception {
+        // prepare paimon table
+        int bucketNum = 1;
+        TablePath tablePath = TablePath.of(DEFAULT_DB, DEFAULT_TABLE);
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("c1", DataTypes.INT())
+                        .column("c2", DataTypes.STRING())
+                        .column("c3", DataTypes.STRING());
+        builder.partitionKeys("c3");
+        builder.primaryKey("c1", "c3");
+        builder.option(CoreOptions.BUCKET.key(), String.valueOf(bucketNum));
+        createTable(tablePath, builder.build());
+        Table table =
+                paimonCatalog.getTable(
+                        Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName()));
+
+        GenericRow record1 =
+                GenericRow.of(12, BinaryString.fromString("a"), 
BinaryString.fromString("A"));
+        writeRecord(tablePath, Collections.singletonList(record1));
+        Snapshot snapshot = table.latestSnapshot().get();
+
+        LakeSource<PaimonSplit> lakeSource = 
lakeStorage.createLakeSource(tablePath);
+        List<PaimonSplit> paimonSplits = 
lakeSource.createPlanner(snapshot::id).plan();
+
+        // test bucket() and partition() method
+        PaimonSplit paimonSplit = paimonSplits.get(0);
+        
assertThat(paimonSplit.partition()).isEqualTo(Collections.singletonList("A"));
+
+        List<Split> actualSplits = ((FileStoreTable) 
table).newScan().plan().splits();
+        assertThat(actualSplits.size()).isEqualTo(paimonSplits.size());
+        Split actualSplit = actualSplits.get(0);
+        assertThat(actualSplit).isEqualTo(paimonSplit.dataSplit());
+        assertThat(((DataSplit) 
actualSplit).bucket()).isEqualTo(paimonSplit.bucket());
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index d9c0bebd5..8abb04d35 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -91,7 +91,7 @@ public class FlinkPaimonTieringTestBase {
     protected static Connection conn;
     protected static Admin admin;
     protected static Configuration clientConf;
-    private static String warehousePath;
+    protected static String warehousePath;
     protected static Catalog paimonCatalog;
 
     private static Configuration initConfig() {

Reply via email to