This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new eb3b5f8ab [flink] Union read decouple with paimon for pk table (#1543)
eb3b5f8ab is described below

commit eb3b5f8abf80a39b1465a0afb2c7d821e658f03a
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Aug 18 18:27:38 2025 +0800

    [flink] Union read decouple with paimon for pk table (#1543)
---
 .../fluss/flink/lake/LakeRecordRecordEmitter.java  |   5 +
 .../fluss/flink/lake/LakeSplitGenerator.java       | 119 +-----
 .../fluss/flink/lake/LakeSplitReaderGenerator.java |  15 +-
 .../fluss/flink/lake/LakeSplitSerializer.java      |  63 ++--
 .../flink/lake/LakeSplitStateInitializer.java      |   5 +-
 .../fluss/flink/lake/reader/KeyValueRow.java       |  48 +++
 .../reader/LakeSnapshotAndLogSplitScanner.java     | 252 +++++++++++++
 .../fluss/flink/lake/reader/SortMergeReader.java   | 417 +++++++++++++++++++++
 .../lake/split/LakeSnapshotAndFlussLogSplit.java   | 110 ++++++
 .../state/LakeSnapshotAndFlussLogSplitState.java   |  45 +++
 .../fluss/flink/lake/LakeSplitSerializerTest.java  | 176 +++++++++
 .../flink/lake/reader/SortMergeReaderTest.java     | 161 ++++++++
 12 files changed, 1276 insertions(+), 140 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
index 207422ed8..9e6848864 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -18,6 +18,7 @@
 package com.alibaba.fluss.flink.lake;
 
 import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
 import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
 import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
@@ -52,6 +53,10 @@ public class LakeRecordRecordEmitter<OUT> {
         } else if (splitState instanceof LakeSnapshotSplitState) {
             ((LakeSnapshotSplitState) 
splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
             sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
+        } else if (splitState instanceof LakeSnapshotAndFlussLogSplitState) {
+            ((LakeSnapshotAndFlussLogSplitState) splitState)
+                    .setRecordsToSkip(recordAndPos.readRecordsCount());
+            sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
         } else {
             throw new UnsupportedOperationException(
                     "Unknown split state type: " + splitState.getClass());
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
index 8c148ccf5..1d8198a1f 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
@@ -19,8 +19,8 @@ package com.alibaba.fluss.flink.lake;
 
 import com.alibaba.fluss.client.admin.Admin;
 import com.alibaba.fluss.client.metadata.LakeSnapshot;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
 import 
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
 import com.alibaba.fluss.flink.source.split.LogSplit;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
@@ -30,17 +30,6 @@ import com.alibaba.fluss.metadata.PartitionInfo;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TableInfo;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.InnerTableScan;
-
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -53,8 +42,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
-import static 
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
-import static com.alibaba.fluss.utils.Preconditions.checkState;
 
 /** A generator for lake splits. */
 public class LakeSplitGenerator {
@@ -86,10 +73,6 @@ public class LakeSplitGenerator {
         // get the file store
         LakeSnapshot lakeSnapshotInfo =
                 
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
-        FileStoreTable fileStoreTable =
-                getTable(
-                        lakeSnapshotInfo.getSnapshotId(),
-                        
extractLakeCatalogProperties(tableInfo.getProperties()));
 
         boolean isLogTable = !tableInfo.hasPrimaryKey();
         boolean isPartitioned = tableInfo.isPartitioned();
@@ -113,17 +96,13 @@ public class LakeSplitGenerator {
                     lakeSplits,
                     isLogTable,
                     lakeSnapshotInfo.getTableBucketsOffset(),
-                    partitionNameById,
-                    fileStoreTable);
+                    partitionNameById);
         } else {
             Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
                     lakeSplits.values().iterator().next();
             // non-partitioned table
             return generateNoPartitionedTableSplit(
-                    nonPartitionLakeSplits,
-                    isLogTable,
-                    lakeSnapshotInfo.getTableBucketsOffset(),
-                    fileStoreTable);
+                    nonPartitionLakeSplits, isLogTable, 
lakeSnapshotInfo.getTableBucketsOffset());
         }
     }
 
@@ -145,8 +124,7 @@ public class LakeSplitGenerator {
             Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
             boolean isLogTable,
             Map<TableBucket, Long> tableBucketSnapshotLogOffset,
-            Map<Long, String> partitionNameById,
-            @Nullable FileStoreTable fileStoreTable)
+            Map<Long, String> partitionNameById)
             throws Exception {
         List<SourceSplitBase> splits = new ArrayList<>();
         Map<String, Long> flussPartitionIdByName =
@@ -181,8 +159,7 @@ public class LakeSplitGenerator {
                                 partitionName,
                                 isLogTable,
                                 tableBucketSnapshotLogOffset,
-                                bucketEndOffset,
-                                fileStoreTable));
+                                bucketEndOffset));
 
             } else {
                 // only lake data
@@ -216,8 +193,7 @@ public class LakeSplitGenerator {
                             isLogTable,
                             // pass empty map since we won't read lake splits
                             Collections.emptyMap(),
-                            bucketEndOffset,
-                            fileStoreTable));
+                            bucketEndOffset));
         }
         return splits;
     }
@@ -228,8 +204,7 @@ public class LakeSplitGenerator {
             @Nullable String partitionName,
             boolean isLogTable,
             Map<TableBucket, Long> tableBucketSnapshotLogOffset,
-            Map<Integer, Long> bucketEndOffset,
-            @Nullable FileStoreTable fileStoreTable) {
+            Map<Integer, Long> bucketEndOffset) {
         List<SourceSplitBase> splits = new ArrayList<>();
         if (isLogTable) {
             if (lakeSplits != null) {
@@ -264,12 +239,9 @@ public class LakeSplitGenerator {
                         new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
                 Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
                 long stoppingOffset = bucketEndOffset.get(bucket);
-                FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
-
                 splits.add(
                         generateSplitForPrimaryKeyTableBucket(
-                                fileStoreTable,
-                                splitGenerator,
+                                lakeSplits != null ? lakeSplits.get(bucket) : 
null,
                                 tableBucket,
                                 partitionName,
                                 snapshotLogOffset,
@@ -295,83 +267,26 @@ public class LakeSplitGenerator {
     }
 
     private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
-            FileStoreTable fileStoreTable,
-            FileStoreSourceSplitGenerator splitGenerator,
+            @Nullable List<LakeSplit> lakeSplits,
             TableBucket tableBucket,
             @Nullable String partitionName,
             @Nullable Long snapshotLogOffset,
             long stoppingOffset) {
-
         // no snapshot data for this bucket or no a corresponding log offset 
in this bucket,
         // can only scan from change log
         if (snapshotLogOffset == null || snapshotLogOffset < 0) {
-            return new PaimonSnapshotAndFlussLogSplit(
+            return new LakeSnapshotAndFlussLogSplit(
                     tableBucket, partitionName, null, EARLIEST_OFFSET, 
stoppingOffset);
         }
 
-        // then, generate a split contains
-        // snapshot and change log so that we can merge change log and snapshot
-        // to get the full data
-        fileStoreTable =
-                fileStoreTable.copy(
-                        Collections.singletonMap(
-                                CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(),
-                                // we set a max size to make sure only one 
splits
-                                MemorySize.MAX_VALUE.toString()));
-        InnerTableScan tableScan =
-                fileStoreTable.newScan().withBucketFilter((b) -> b == 
tableBucket.getBucket());
-
-        if (partitionName != null) {
-            tableScan =
-                    
tableScan.withPartitionFilter(getPartitionSpec(fileStoreTable, partitionName));
-        }
-
-        List<FileStoreSourceSplit> fileStoreSourceSplits =
-                splitGenerator.createSplits(tableScan.plan());
-
-        checkState(fileStoreSourceSplits.size() == 1, "Splits for primary key 
table must be 1.");
-        FileStoreSourceSplit fileStoreSourceSplit = 
fileStoreSourceSplits.get(0);
-        return new PaimonSnapshotAndFlussLogSplit(
-                tableBucket,
-                partitionName,
-                fileStoreSourceSplit,
-                snapshotLogOffset,
-                stoppingOffset);
-    }
-
-    private Map<String, String> getPartitionSpec(
-            FileStoreTable fileStoreTable, String partitionName) {
-        List<String> partitionKeys = fileStoreTable.partitionKeys();
-        checkState(
-                partitionKeys.size() == 1,
-                "Must only one partition key for paimon table %, but got %s, 
the partition keys are: ",
-                tableInfo.getTablePath(),
-                partitionKeys.size(),
-                partitionKeys);
-        return Collections.singletonMap(partitionKeys.get(0), partitionName);
-    }
-
-    private FileStoreTable getTable(long snapshotId, Map<String, String> 
catalogProperties)
-            throws Exception {
-        try (Catalog catalog =
-                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(catalogProperties))) {
-            return (FileStoreTable)
-                    catalog.getTable(
-                                    Identifier.create(
-                                            
tableInfo.getTablePath().getDatabaseName(),
-                                            
tableInfo.getTablePath().getTableName()))
-                            .copy(
-                                    Collections.singletonMap(
-                                            CoreOptions.SCAN_SNAPSHOT_ID.key(),
-                                            String.valueOf(snapshotId)));
-        }
+        return new LakeSnapshotAndFlussLogSplit(
+                tableBucket, partitionName, lakeSplits, snapshotLogOffset, 
stoppingOffset);
     }
 
     private List<SourceSplitBase> generateNoPartitionedTableSplit(
             Map<Integer, List<LakeSplit>> lakeSplits,
             boolean isLogTable,
-            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
-            FileStoreTable fileStoreTable) {
+            Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
         // iterate all bucket
         // assume bucket is from 0 to bucket count
         Map<Integer, Long> bucketEndOffset =
@@ -380,12 +295,6 @@ public class LakeSplitGenerator {
                         IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
                         bucketOffsetsRetriever);
         return generateSplit(
-                lakeSplits,
-                null,
-                null,
-                isLogTable,
-                tableBucketSnapshotLogOffset,
-                bucketEndOffset,
-                fileStoreTable);
+                lakeSplits, null, null, isLogTable, 
tableBucketSnapshotLogOffset, bucketEndOffset);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
index 02e5e5e5b..410bef73d 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -18,7 +18,9 @@
 package com.alibaba.fluss.flink.lake;
 
 import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner;
 import com.alibaba.fluss.flink.lake.reader.LakeSnapshotScanner;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
 import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
@@ -73,7 +75,8 @@ public class LakeSplitReaderGenerator {
             boundedSplits.add(split);
         } else if (split instanceof LakeSnapshotSplit) {
             boundedSplits.add(split);
-            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
+        } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
+            boundedSplits.add(split);
         } else {
             throw new UnsupportedOperationException(
                     String.format("The split type of %s is not supported.", 
split.getClass()));
@@ -112,7 +115,15 @@ public class LakeSplitReaderGenerator {
                     new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
             return new BoundedSplitReader(
                     lakeSnapshotScanner, 
lakeSnapshotSplit.getRecordsToSplit());
-            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
+        } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
+            LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+                    (LakeSnapshotAndFlussLogSplit) split;
+            LakeSnapshotAndLogSplitScanner lakeSnapshotAndLogSplitScanner =
+                    new LakeSnapshotAndLogSplitScanner(
+                            table, lakeSource, lakeSnapshotAndFlussLogSplit, 
projectedFields);
+            return new BoundedSplitReader(
+                    lakeSnapshotAndLogSplitScanner,
+                    lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
         } else {
             throw new UnsupportedOperationException(
                     String.format("The split type of %s is not supported.", 
split.getClass()));
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
index 19b6f29c7..0bfb897a6 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
@@ -17,8 +17,8 @@
 
 package com.alibaba.fluss.flink.lake;
 
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
 import com.alibaba.fluss.flink.source.split.LogSplit;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
 import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
@@ -27,15 +27,15 @@ import com.alibaba.fluss.metadata.TableBucket;
 
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.flink.source.FileStoreSourceSplitSerializer;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
+import static 
com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
 import static 
com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit.LAKE_SNAPSHOT_SPLIT_KIND;
-import static 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit.PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
 
 /** A serializer for lake split. */
 public class LakeSplitSerializer {
@@ -52,32 +52,30 @@ public class LakeSplitSerializer {
                     sourceSplitSerializer.serialize(((LakeSnapshotSplit) 
split).getLakeSplit());
             out.writeInt(serializeBytes.length);
             out.write(serializeBytes);
-        } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
-            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
-            FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
-                    new FileStoreSourceSplitSerializer();
+        } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
             // writing file store source split
-            PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
-                    ((PaimonSnapshotAndFlussLogSplit) split);
-            FileStoreSourceSplit fileStoreSourceSplit =
-                    paimonSnapshotAndFlussLogSplit.getSnapshotSplit();
-            if (fileStoreSourceSplit == null) {
+            LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+                    ((LakeSnapshotAndFlussLogSplit) split);
+            List<LakeSplit> lakeSplits = 
lakeSnapshotAndFlussLogSplit.getLakeSplits();
+            if (lakeSplits == null) {
                 // no snapshot data for the bucket
                 out.writeBoolean(false);
             } else {
                 out.writeBoolean(true);
-                byte[] serializeBytes =
-                        
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
-                out.writeInt(serializeBytes.length);
-                out.write(serializeBytes);
+                out.writeInt(lakeSplits.size());
+                for (LakeSplit lakeSplit : lakeSplits) {
+                    byte[] serializeBytes = 
sourceSplitSerializer.serialize(lakeSplit);
+                    out.writeInt(serializeBytes.length);
+                    out.write(serializeBytes);
+                }
             }
             // writing starting/stopping offset
-            out.writeLong(paimonSnapshotAndFlussLogSplit.getStartingOffset());
+            out.writeLong(lakeSnapshotAndFlussLogSplit.getStartingOffset());
             out.writeLong(
-                    paimonSnapshotAndFlussLogSplit
+                    lakeSnapshotAndFlussLogSplit
                             .getStoppingOffset()
                             .orElse(LogSplit.NO_STOPPING_OFFSET));
-            out.writeLong(paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
+            out.writeLong(lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported split type: " + split.getClass().getName());
@@ -97,25 +95,26 @@ public class LakeSplitSerializer {
                     sourceSplitSerializer.deserialize(
                             sourceSplitSerializer.getVersion(), 
serializeBytes);
             return new LakeSnapshotSplit(tableBucket, partition, 
fileStoreSourceSplit);
-            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
-        } else if (splitKind == PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
-            FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
-                    new FileStoreSourceSplitSerializer();
-            FileStoreSourceSplit fileStoreSourceSplit = null;
+        } else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
+            List<LakeSplit> lakeSplits = null;
             if (input.readBoolean()) {
-                byte[] serializeBytes = new byte[input.readInt()];
-                input.read(serializeBytes);
-                fileStoreSourceSplit =
-                        fileStoreSourceSplitSerializer.deserialize(
-                                fileStoreSourceSplitSerializer.getVersion(), 
serializeBytes);
+                int lakeSplitSize = input.readInt();
+                lakeSplits = new ArrayList<>(lakeSplitSize);
+                for (int i = 0; i < lakeSplitSize; i++) {
+                    byte[] serializeBytes = new byte[input.readInt()];
+                    input.read(serializeBytes);
+                    lakeSplits.add(
+                            sourceSplitSerializer.deserialize(
+                                    sourceSplitSerializer.getVersion(), 
serializeBytes));
+                }
             }
             long startingOffset = input.readLong();
             long stoppingOffset = input.readLong();
             long recordsToSkip = input.readLong();
-            return new PaimonSnapshotAndFlussLogSplit(
+            return new LakeSnapshotAndFlussLogSplit(
                     tableBucket,
                     partition,
-                    fileStoreSourceSplit,
+                    lakeSplits,
                     startingOffset,
                     stoppingOffset,
                     recordsToSkip);
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
index aae8987ad..a8779bce0 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
@@ -17,7 +17,9 @@
 
 package com.alibaba.fluss.flink.lake;
 
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
 import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
 import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
@@ -32,7 +34,8 @@ public class LakeSplitStateInitializer {
             return new 
PaimonSnapshotAndFlussLogSplitState((PaimonSnapshotAndFlussLogSplit) split);
         } else if (split instanceof LakeSnapshotSplit) {
             return new LakeSnapshotSplitState((LakeSnapshotSplit) split);
-            // TODO support primary key table in 
https://github.com/apache/fluss/issues/1434
+        } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
+            return new 
LakeSnapshotAndFlussLogSplitState((LakeSnapshotAndFlussLogSplit) split);
         } else {
             throw new UnsupportedOperationException("Unsupported split type: " 
+ split);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/KeyValueRow.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/KeyValueRow.java
new file mode 100644
index 000000000..abb84a687
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/KeyValueRow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.ProjectedRow;
+
+/** An {@link InternalRow} with the key part. */
+public class KeyValueRow {
+
+    private final boolean isDelete;
+    private final ProjectedRow pkRow;
+    private final InternalRow valueRow;
+
+    public KeyValueRow(int[] keyIndexes, InternalRow valueRow, boolean 
isDelete) {
+        this.pkRow = ProjectedRow.from(keyIndexes).replaceRow(valueRow);
+        this.isDelete = isDelete;
+        this.valueRow = valueRow;
+    }
+
+    public boolean isDelete() {
+        return isDelete;
+    }
+
+    public InternalRow keyRow() {
+        return pkRow;
+    }
+
+    public InternalRow valueRow() {
+        return valueRow;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
new file mode 100644
index 000000000..e3c835ee0
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
+import com.alibaba.fluss.client.table.scanner.log.LogScanner;
+import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** A scanner to merge the lakehouse's snapshot and change log. */
+public class LakeSnapshotAndLogSplitScanner implements BatchScanner {
+
+    private final LakeSnapshotAndFlussLogSplit 
lakeSnapshotSplitAndFlussLogSplit;
+    private Comparator<InternalRow> rowComparator;
+    private List<CloseableIterator<LogRecord>> lakeRecordIterators = new 
ArrayList<>();
+    private final LakeSource<LakeSplit> lakeSource;
+
+    private final int[] pkIndexes;
+
+    // the indexes of primary key in emitted row by paimon and fluss
+    private int[] keyIndexesInRow;
+    @Nullable private int[] adjustProjectedFields;
+    private final int[] newProjectedFields;
+
+    // the sorted logs in memory, mapping from key -> value
+    private Map<InternalRow, KeyValueRow> logRows;
+
+    private final LogScanner logScanner;
+    private final long stoppingOffset;
+    private boolean logScanFinished;
+
+    private SortMergeReader currentSortMergeReader;
+
+    public LakeSnapshotAndLogSplitScanner(
+            Table table,
+            LakeSource<LakeSplit> lakeSource,
+            LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit,
+            @Nullable int[] projectedFields) {
+        this.pkIndexes = 
table.getTableInfo().getSchema().getPrimaryKeyIndexes();
+        this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit;
+        this.lakeSource = lakeSource;
+        this.newProjectedFields = getNeedProjectFields(table, projectedFields);
+
+        this.logScanner = 
table.newScan().project(newProjectedFields).createLogScanner();
+        this.lakeSource.withProject(
+                Arrays.stream(newProjectedFields)
+                        .mapToObj(field -> new int[] {field})
+                        .toArray(int[][]::new));
+
+        TableBucket tableBucket = 
lakeSnapshotAndFlussLogSplit.getTableBucket();
+        if (tableBucket.getPartitionId() != null) {
+            this.logScanner.subscribe(
+                    tableBucket.getPartitionId(),
+                    tableBucket.getBucket(),
+                    lakeSnapshotAndFlussLogSplit.getStartingOffset());
+        } else {
+            this.logScanner.subscribe(
+                    tableBucket.getBucket(), 
lakeSnapshotAndFlussLogSplit.getStartingOffset());
+        }
+
+        this.stoppingOffset =
+                lakeSnapshotAndFlussLogSplit
+                        .getStoppingOffset()
+                        .orElseThrow(
+                                () ->
+                                        new RuntimeException(
+                                                "StoppingOffset is null for 
split: "
+                                                        + 
lakeSnapshotAndFlussLogSplit));
+
+        this.logScanFinished = 
lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset;
+    }
+
+    private int[] getNeedProjectFields(Table flussTable, @Nullable int[] 
projectedFields) {
+        if (projectedFields != null) {
+            // we need to include the primary key in projected fields to sort 
merge by pk
+            // if the provided don't include, we need to include it
+            List<Integer> newProjectedFields =
+                    
Arrays.stream(projectedFields).boxed().collect(Collectors.toList());
+
+            // the indexes of primary key with new projected fields
+            keyIndexesInRow = new int[pkIndexes.length];
+            for (int i = 0; i < pkIndexes.length; i++) {
+                int primaryKeyIndex = pkIndexes[i];
+                // search the pk in projected fields
+                int indexInProjectedFields = findIndex(projectedFields, 
primaryKeyIndex);
+                if (indexInProjectedFields >= 0) {
+                    keyIndexesInRow[i] = indexInProjectedFields;
+                } else {
+                    // no pk in projected fields, we must include it to do
+                    // merge sort
+                    newProjectedFields.add(primaryKeyIndex);
+                    keyIndexesInRow[i] = newProjectedFields.size() - 1;
+                }
+            }
+            int[] newProjection = 
newProjectedFields.stream().mapToInt(Integer::intValue).toArray();
+            // the underlying scan will use the new projection to scan data,
+            // but will still need to map from the new projection to the 
origin projected fields
+            int[] adjustProjectedFields = new int[projectedFields.length];
+            for (int i = 0; i < projectedFields.length; i++) {
+                adjustProjectedFields[i] = findIndex(newProjection, 
projectedFields[i]);
+            }
+            this.adjustProjectedFields = adjustProjectedFields;
+            return newProjection;
+        } else {
+            // no projectedFields, use all fields
+            keyIndexesInRow = pkIndexes;
+            return IntStream.range(0, 
flussTable.getTableInfo().getRowType().getFieldCount())
+                    .toArray();
+        }
+    }
+
+    private int findIndex(int[] array, int target) {
+        int index = -1;
+        for (int i = 0; i < array.length; i++) {
+            if (array[i] == target) {
+                index = i;
+                break;
+            }
+        }
+        return index;
+    }
+
+    @Nullable
+    @Override
+    public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws 
IOException {
+        if (logScanFinished) {
+            if (lakeRecordIterators.isEmpty()) {
+                if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
+                        || 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
+                    lakeRecordIterators = Collections.emptyList();
+                } else {
+                    for (LakeSplit lakeSplit : 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
+                        lakeRecordIterators.add(
+                                lakeSource.createRecordReader(() -> 
lakeSplit).read());
+                    }
+                }
+            }
+            if (currentSortMergeReader == null) {
+                currentSortMergeReader =
+                        new SortMergeReader(
+                                adjustProjectedFields,
+                                keyIndexesInRow,
+                                lakeRecordIterators,
+                                rowComparator,
+                                CloseableIterator.wrap(
+                                        logRows == null
+                                                ? Collections.emptyIterator()
+                                                : 
logRows.values().iterator()));
+            }
+            return currentSortMergeReader.readBatch();
+        } else {
+            if (lakeRecordIterators.isEmpty()) {
+                if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
+                        || 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
+                    lakeRecordIterators = Collections.emptyList();
+                    logRows = new LinkedHashMap<>();
+                } else {
+                    for (LakeSplit lakeSplit : 
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
+                        RecordReader reader = lakeSource.createRecordReader(() 
-> lakeSplit);
+                        if (reader instanceof SortedRecordReader) {
+                            rowComparator = ((SortedRecordReader) 
reader).order();
+                        } else {
+                            throw new UnsupportedOperationException(
+                                    "lake records must instance of sorted 
view.");
+                        }
+                        lakeRecordIterators.add(reader.read());
+                    }
+                    logRows = new TreeMap<>(rowComparator);
+                }
+            }
+            pollLogRecords(timeout);
+            return CloseableIterator.wrap(Collections.emptyIterator());
+        }
+    }
+
+    private void pollLogRecords(Duration timeout) {
+        ScanRecords scanRecords = logScanner.poll(timeout);
+        for (ScanRecord scanRecord : scanRecords) {
+            boolean isDelete =
+                    scanRecord.getChangeType() == ChangeType.DELETE
+                            || scanRecord.getChangeType() == 
ChangeType.UPDATE_BEFORE;
+            KeyValueRow keyValueRow =
+                    new KeyValueRow(keyIndexesInRow, scanRecord.getRow(), 
isDelete);
+            InternalRow keyRow = keyValueRow.keyRow();
+            // upsert the key value row
+            logRows.put(keyRow, keyValueRow);
+            if (scanRecord.logOffset() >= stoppingOffset - 1) {
+                // has reached to the end
+                logScanFinished = true;
+                break;
+            }
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            if (logScanner != null) {
+                logScanner.close();
+            }
+            if (lakeRecordIterators != null) {
+                for (CloseableIterator<LogRecord> iterator : 
lakeRecordIterators) {
+                    iterator.close();
+                }
+            }
+        } catch (Exception e) {
+            throw new IOException("Failed to close resources", e);
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java
new file mode 100644
index 000000000..f1a52b623
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.function.Function;
+
+/** A sort merge reader to merge lakehouse snapshot record and fluss change 
log. */
+class SortMergeReader {
+
+    private final ProjectedRow snapshotProjectedPkRow;
+    private final CloseableIterator<LogRecord> lakeRecordIterator;
+    private final Comparator<InternalRow> userKeyComparator;
+    private CloseableIterator<KeyValueRow> changeLogIterator;
+
+    private final SnapshotMergedRowIteratorWrapper 
snapshotMergedRowIteratorWrapper;
+
+    private final ChangeLogIteratorWrapper changeLogIteratorWrapper;
+    private @Nullable final ProjectedRow projectedRow;
+
+    public SortMergeReader(
+            @Nullable int[] projectedFields,
+            int[] pkIndexes,
+            List<CloseableIterator<LogRecord>> lakeRecordIterators,
+            Comparator<InternalRow> userKeyComparator,
+            CloseableIterator<KeyValueRow> changeLogIterator) {
+        this.userKeyComparator = userKeyComparator;
+        this.snapshotProjectedPkRow = ProjectedRow.from(pkIndexes);
+        this.lakeRecordIterator =
+                ConcatRecordIterator.wrap(lakeRecordIterators, 
userKeyComparator, pkIndexes);
+        this.changeLogIterator = changeLogIterator;
+        this.changeLogIteratorWrapper = new ChangeLogIteratorWrapper();
+        this.snapshotMergedRowIteratorWrapper = new 
SnapshotMergedRowIteratorWrapper();
+        // to project to fields provided by user
+        this.projectedRow = projectedFields == null ? null : 
ProjectedRow.from(projectedFields);
+    }
+
+    @Nullable
+    public CloseableIterator<InternalRow> readBatch() {
+        if (!lakeRecordIterator.hasNext()) {
+            return changeLogIterator.hasNext()
+                    ? changeLogIteratorWrapper.replace(changeLogIterator)
+                    : null;
+        } else {
+            CloseableIterator<SortMergeRows> mergedRecordIterator =
+                    transform(lakeRecordIterator, 
this::sortMergeWithChangeLog);
+
+            return 
snapshotMergedRowIteratorWrapper.replace(mergedRecordIterator);
+        }
+    }
+
+    /** A concat record iterator to concat multiple record iterator. */
+    private static class ConcatRecordIterator implements 
CloseableIterator<LogRecord> {
+        private final PriorityQueue<SingleElementHeadIterator<LogRecord>> 
priorityQueue;
+        private final ProjectedRow snapshotProjectedPkRow1;
+        private final ProjectedRow snapshotProjectedPkRow2;
+
+        public ConcatRecordIterator(
+                List<CloseableIterator<LogRecord>> iteratorList,
+                int[] pkIndexes,
+                Comparator<InternalRow> comparator) {
+            this.snapshotProjectedPkRow1 = ProjectedRow.from(pkIndexes);
+            this.snapshotProjectedPkRow2 = ProjectedRow.from(pkIndexes);
+            this.priorityQueue =
+                    new PriorityQueue<>(
+                            Math.max(1, iteratorList.size()),
+                            (s1, s2) ->
+                                    comparator.compare(
+                                            getComparableRow(s1, 
snapshotProjectedPkRow1),
+                                            getComparableRow(s2, 
snapshotProjectedPkRow2)));
+            iteratorList.stream()
+                    .filter(Iterator::hasNext)
+                    .map(
+                            iterator ->
+                                    SingleElementHeadIterator.addElementToHead(
+                                            iterator.next(), iterator))
+                    .forEach(priorityQueue::add);
+        }
+
+        public static CloseableIterator<LogRecord> wrap(
+                List<CloseableIterator<LogRecord>> iteratorList,
+                Comparator<InternalRow> comparator,
+                int[] pkIndexes) {
+            if (iteratorList.isEmpty()) {
+                return CloseableIterator.wrap(Collections.emptyIterator());
+            }
+            return new ConcatRecordIterator(iteratorList, pkIndexes, 
comparator);
+        }
+
+        private InternalRow getComparableRow(
+                SingleElementHeadIterator<LogRecord> iterator, ProjectedRow 
projectedRow) {
+            return projectedRow.replaceRow(iterator.peek().getRow());
+        }
+
+        @Override
+        public void close() {
+            while (!priorityQueue.isEmpty()) {
+                priorityQueue.poll().close();
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            while (!priorityQueue.isEmpty()) {
+                CloseableIterator<LogRecord> iterator = priorityQueue.peek();
+                if (iterator.hasNext()) {
+                    return true;
+                }
+                priorityQueue.poll().close();
+            }
+            return false;
+        }
+
+        @Override
+        public LogRecord next() {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
+            return priorityQueue.peek().next();
+        }
+    }
+
+    private SortMergeRows sortMergeWithChangeLog(InternalRow lakeSnapshotRow) {
+        // no log record, we return the snapshot record
+        if (!changeLogIterator.hasNext()) {
+            return new SortMergeRows(lakeSnapshotRow);
+        }
+        KeyValueRow logKeyValueRow = changeLogIterator.next();
+        // now, let's compare with the snapshot row with log row
+        int compareResult =
+                userKeyComparator.compare(
+                        snapshotProjectedPkRow.replaceRow(lakeSnapshotRow),
+                        logKeyValueRow.keyRow());
+        if (compareResult == 0) {
+            // record of snapshot is equal to log, but the log record is 
delete,
+            // we shouldn't emit record
+            if (logKeyValueRow.isDelete()) {
+                return SortMergeRows.EMPTY;
+            } else {
+                // return the log record
+                return new SortMergeRows(logKeyValueRow.valueRow());
+            }
+        }
+
+        // the snapshot record is less than the log record, emit the
+        // snapshot record
+        if (compareResult < 0) {
+            // need to put back the log record to log iterator to make the log 
record
+            // can be advanced again
+            changeLogIterator =
+                    SingleElementHeadIterator.addElementToHead(logKeyValueRow, 
changeLogIterator);
+            return new SortMergeRows(lakeSnapshotRow);
+        } else {
+            // snapshot record > log record
+            // we should emit the log record firsts; and still need to 
iterator changelog to find
+            // the first change log greater than the snapshot record
+            List<InternalRow> emitRows = new ArrayList<>();
+            emitRows.add(logKeyValueRow.valueRow());
+            boolean shouldEmitSnapshotRecord = true;
+            while (changeLogIterator.hasNext()) {
+                // get the next log record
+                logKeyValueRow = changeLogIterator.next();
+                // compare with the snapshot row,
+                compareResult =
+                        userKeyComparator.compare(
+                                
snapshotProjectedPkRow.replaceRow(lakeSnapshotRow),
+                                logKeyValueRow.keyRow());
+                // if snapshot record < the log record
+                if (compareResult < 0) {
+                    // we can break the loop
+                    changeLogIterator =
+                            SingleElementHeadIterator.addElementToHead(
+                                    logKeyValueRow, changeLogIterator);
+                    break;
+                } else if (compareResult > 0) {
+                    // snapshot record > the log record
+                    // the log record should be emitted
+                    emitRows.add(logKeyValueRow.valueRow());
+                } else {
+                    // log record == snapshot record
+                    // the log record should be emitted if is not delete, but 
the snapshot record
+                    // shouldn't be emitted
+                    if (!logKeyValueRow.isDelete()) {
+                        emitRows.add(logKeyValueRow.valueRow());
+                    }
+                    shouldEmitSnapshotRecord = false;
+                }
+            }
+
+            if (shouldEmitSnapshotRecord) {
+                emitRows.add(lakeSnapshotRow);
+            }
+            return new SortMergeRows(emitRows);
+        }
+    }
+
+    private static class SingleElementHeadIterator<T> implements 
CloseableIterator<T> {
+        private T singleElement;
+        private CloseableIterator<T> inner;
+        private boolean singleElementReturned;
+
+        public SingleElementHeadIterator(T element, CloseableIterator<T> 
inner) {
+            this.singleElement = element;
+            this.inner = inner;
+            this.singleElementReturned = false;
+        }
+
+        public static <T> SingleElementHeadIterator<T> addElementToHead(
+                T firstElement, CloseableIterator<T> originElementIterator) {
+            if (originElementIterator instanceof SingleElementHeadIterator) {
+                SingleElementHeadIterator<T> singleElementHeadIterator =
+                        (SingleElementHeadIterator<T>) originElementIterator;
+                singleElementHeadIterator.set(firstElement, 
singleElementHeadIterator.inner);
+                return singleElementHeadIterator;
+            } else {
+                return new SingleElementHeadIterator<>(firstElement, 
originElementIterator);
+            }
+        }
+
+        public void set(T element, CloseableIterator<T> inner) {
+            this.singleElement = element;
+            this.inner = inner;
+            this.singleElementReturned = false;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !singleElementReturned || inner.hasNext();
+        }
+
+        @Override
+        public T next() {
+            if (singleElementReturned) {
+                return inner.next();
+            }
+            singleElementReturned = true;
+            return singleElement;
+        }
+
+        public T peek() {
+            if (singleElementReturned) {
+                this.singleElement = inner.next();
+                this.singleElementReturned = false;
+                return this.singleElement;
+            }
+            return singleElement;
+        }
+
+        @Override
+        public void close() {
+            inner.close();
+        }
+    }
+
+    private static class ChangeLogIteratorWrapper implements 
CloseableIterator<InternalRow> {
+        private CloseableIterator<KeyValueRow> changeLogRecordIterator;
+
+        public ChangeLogIteratorWrapper() {}
+
+        public ChangeLogIteratorWrapper replace(
+                CloseableIterator<KeyValueRow> changeLogRecordIterator) {
+            this.changeLogRecordIterator = changeLogRecordIterator;
+            return this;
+        }
+
+        @Override
+        public void close() {
+            if (changeLogRecordIterator != null) {
+                changeLogRecordIterator.close();
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return changeLogRecordIterator != null && 
changeLogRecordIterator.hasNext();
+        }
+
+        @Override
+        public InternalRow next() {
+            return changeLogRecordIterator.next().valueRow();
+        }
+    }
+
+    private class SnapshotMergedRowIteratorWrapper implements 
CloseableIterator<InternalRow> {
+        private CloseableIterator<SortMergeRows> currentLakeSnapshotRecords;
+
+        private @Nullable Iterator<InternalRow> currentMergedRows;
+
+        // the row to be returned
+        private @Nullable InternalRow returnedRow;
+
+        public SnapshotMergedRowIteratorWrapper replace(
+                CloseableIterator<SortMergeRows> currentLakeSnapshotRecords) {
+            this.currentLakeSnapshotRecords = currentLakeSnapshotRecords;
+            this.returnedRow = null;
+            this.currentMergedRows = null;
+            return this;
+        }
+
+        @Override
+        public void close() {
+            currentLakeSnapshotRecords.close();
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (returnedRow != null) {
+                return true;
+            }
+            try {
+                // if currentMergedRows is null, we need to get the next 
mergedRows
+                if (currentMergedRows == null) {
+                    SortMergeRows sortMergeRows =
+                            currentLakeSnapshotRecords.hasNext()
+                                    ? currentLakeSnapshotRecords.next()
+                                    : null;
+                    //  next mergedRows is not null and is not empty, set the 
currentMergedRows
+                    if (sortMergeRows != null && 
!sortMergeRows.mergedRows.isEmpty()) {
+                        currentMergedRows = 
sortMergeRows.mergedRows.iterator();
+                    }
+                }
+                // check if has next row, whether does, set the internalRow to 
returned in method
+                // next;
+                if (currentMergedRows != null && currentMergedRows.hasNext()) {
+                    returnedRow = currentMergedRows.next();
+                }
+                return returnedRow != null;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public InternalRow next() {
+            InternalRow returnedRow =
+                    projectedRow == null
+                            ? this.returnedRow
+                            : projectedRow.replaceRow(this.returnedRow);
+            // now, we can set the internalRow to null,
+            // if no any row remain in current merged row, set the 
currentMergedRows to null
+            // to enable fetch next merged rows
+            this.returnedRow = null;
+            if (currentMergedRows != null && !currentMergedRows.hasNext()) {
+                currentMergedRows = null;
+            }
+            return returnedRow;
+        }
+    }
+
+    private static class SortMergeRows {
+        private static final SortMergeRows EMPTY = new 
SortMergeRows(Collections.emptyList());
+
+        // the rows merge with change log, one snapshot row may advance 
multiple change log
+        private final List<InternalRow> mergedRows;
+
+        public SortMergeRows(List<InternalRow> mergedRows) {
+            this.mergedRows = mergedRows;
+        }
+
+        public SortMergeRows(InternalRow internalRow) {
+            this.mergedRows = Collections.singletonList(internalRow);
+        }
+    }
+
+    private <R> CloseableIterator<R> transform(
+            CloseableIterator<LogRecord> originElementIterator,
+            final Function<InternalRow, R> function) {
+        return new CloseableIterator<R>() {
+            private final CloseableIterator<LogRecord> inner = 
originElementIterator;
+
+            @Override
+            public void close() {
+                inner.close();
+            }
+
+            @Override
+            public boolean hasNext() {
+                return inner.hasNext();
+            }
+
+            @Override
+            public R next() {
+                LogRecord element = inner.next();
+                return function.apply(element.getRow());
+            }
+        };
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
new file mode 100644
index 000000000..3c5917414
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.split;
+
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A split mixing Lake snapshot and Fluss log. */
+public class LakeSnapshotAndFlussLogSplit extends SourceSplitBase {
+
+    public static final byte LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND = -2;
+
+    // may be null when no snapshot data for the bucket
+    @Nullable private final List<LakeSplit> lakeSnapshotSplits;
+
+    /** The records to skip when reading the splits. */
+    private long recordOffset = 0;
+    // TODO: Support skip read file by record fileOffset
+
+    private final long startingOffset;
+    private final long stoppingOffset;
+
+    public LakeSnapshotAndFlussLogSplit(
+            TableBucket tableBucket,
+            @Nullable List<LakeSplit> snapshotSplits,
+            long startingOffset,
+            long stoppingOffset) {
+        this(tableBucket, null, snapshotSplits, startingOffset, 
stoppingOffset, 0);
+    }
+
+    public LakeSnapshotAndFlussLogSplit(
+            TableBucket tableBucket,
+            @Nullable String partitionName,
+            @Nullable List<LakeSplit> snapshotSplits,
+            long startingOffset,
+            long stoppingOffset) {
+        this(tableBucket, partitionName, snapshotSplits, startingOffset, 
stoppingOffset, 0);
+    }
+
+    public LakeSnapshotAndFlussLogSplit(
+            TableBucket tableBucket,
+            @Nullable String partitionName,
+            @Nullable List<LakeSplit> snapshotSplits,
+            long startingOffset,
+            long stoppingOffset,
+            long recordsToSkip) {
+        super(tableBucket, partitionName);
+        this.lakeSnapshotSplits = snapshotSplits;
+        this.startingOffset = startingOffset;
+        this.stoppingOffset = stoppingOffset;
+        this.recordOffset = recordsToSkip;
+    }
+
+    public LakeSnapshotAndFlussLogSplit updateWithRecordsToSkip(long 
recordsToSkip) {
+        this.recordOffset = recordsToSkip;
+        return this;
+    }
+
+    public long getRecordsToSkip() {
+        return recordOffset;
+    }
+
+    public long getStartingOffset() {
+        return startingOffset;
+    }
+
+    public Optional<Long> getStoppingOffset() {
+        return stoppingOffset >= 0 ? Optional.of(stoppingOffset) : 
Optional.empty();
+    }
+
+    @Override
+    public boolean isLakeSplit() {
+        return true;
+    }
+
+    protected byte splitKind() {
+        return LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
+    }
+
+    @Override
+    public String splitId() {
+        return toSplitId("lake-hybrid-snapshot-log-", tableBucket);
+    }
+
+    public List<LakeSplit> getLakeSplits() {
+        return lakeSnapshotSplits;
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
new file mode 100644
index 000000000..65dda119f
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.state;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.flink.source.split.SourceSplitState;
+
+/** The state of {@link LakeSnapshotAndFlussLogSplit}. */
+public class LakeSnapshotAndFlussLogSplitState extends SourceSplitState {
+
+    private long recordsToSkip;
+    private final LakeSnapshotAndFlussLogSplit split;
+
+    public LakeSnapshotAndFlussLogSplitState(LakeSnapshotAndFlussLogSplit 
split) {
+        super(split);
+        this.recordsToSkip = split.getRecordsToSkip();
+        this.split = split;
+    }
+
+    public void setRecordsToSkip(long recordsToSkip) {
+        this.recordsToSkip = recordsToSkip;
+    }
+
+    @Override
+    public SourceSplitBase toSourceSplit() {
+        return split.updateWithRecordsToSkip(recordsToSkip);
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
new file mode 100644
index 000000000..b190852fb
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TableBucket;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static 
com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test case for {@link LakeSplitSerializer}. */
+class LakeSplitSerializerTest {
+    private static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1;
+
+    private static final int SERIALIZER_VERSION = 3;
+
+    private static final byte[] TEST_DATA = "test-lake-split".getBytes();
+
+    private static final int STOPPING_OFFSET = 1024;
+
+    private static final LakeSplit LAKE_SPLIT =
+            new TestLakeSplit(0, Collections.singletonList("2025-08-18"));
+
+    private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer =
+            new TestSimpleVersionedSerializer();
+
+    @Mock private TableBucket tableBucket = new TableBucket(0, 1L, 0);
+
+    private final LakeSplitSerializer serializer = new 
LakeSplitSerializer(sourceSplitSerializer);
+
+    @Test
+    void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {
+        // Prepare test data
+        LakeSnapshotSplit originalSplit =
+                new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT);
+
+        DataOutputSerializer output = new 
DataOutputSerializer(STOPPING_OFFSET);
+        serializer.serialize(output, originalSplit);
+
+        SourceSplitBase deserializedSplit =
+                serializer.deserialize(
+                        LAKE_SNAPSHOT_SPLIT_KIND,
+                        tableBucket,
+                        "2025-08-18",
+                        new DataInputDeserializer(output.getCopyOfBuffer()));
+
+        assertThat(deserializedSplit instanceof LakeSnapshotSplit).isTrue();
+        LakeSnapshotSplit result = (LakeSnapshotSplit) deserializedSplit;
+
+        assertThat(tableBucket).isEqualTo(result.getTableBucket());
+        assertThat("2025-08-18").isEqualTo(result.getPartitionName());
+        assertThat(LAKE_SPLIT).isEqualTo(result.getLakeSplit());
+    }
+
+    @Test
+    void testSerializeAndDeserializeLakeSnapshotAndFlussLogSplit() throws 
IOException {
+        LakeSnapshotAndFlussLogSplit originalSplit =
+                new LakeSnapshotAndFlussLogSplit(
+                        tableBucket,
+                        "2025-08-18",
+                        Collections.singletonList(LAKE_SPLIT),
+                        EARLIEST_OFFSET,
+                        STOPPING_OFFSET);
+
+        DataOutputSerializer output = new 
DataOutputSerializer(STOPPING_OFFSET);
+        serializer.serialize(output, originalSplit);
+
+        SourceSplitBase deserializedSplit =
+                serializer.deserialize(
+                        LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND,
+                        tableBucket,
+                        "2025-08-18",
+                        new DataInputDeserializer(output.getCopyOfBuffer()));
+
+        assertThat(deserializedSplit instanceof 
LakeSnapshotAndFlussLogSplit).isTrue();
+        LakeSnapshotAndFlussLogSplit result = (LakeSnapshotAndFlussLogSplit) 
deserializedSplit;
+
+        assertThat(tableBucket).isEqualTo(result.getTableBucket());
+        assertThat("2025-08-18").isEqualTo(result.getPartitionName());
+        
assertThat(Collections.singletonList(LAKE_SPLIT)).isEqualTo(result.getLakeSplits());
+        assertThat(EARLIEST_OFFSET).isEqualTo(result.getStartingOffset());
+        assertThat((long) 
STOPPING_OFFSET).isEqualTo(result.getStoppingOffset().get());
+    }
+
+    @Test
+    void testDeserializeWithWrongSplitKind() throws IOException {
+        DataOutputSerializer output = new DataOutputSerializer(1024);
+        output.writeInt(0);
+
+        assertThatThrownBy(
+                        () ->
+                                serializer.deserialize(
+                                        (byte) 99,
+                                        tableBucket,
+                                        "2023-10-01",
+                                        new 
DataInputDeserializer(output.getCopyOfBuffer())))
+                .withFailMessage(() -> "Unsupported split kind: ")
+                .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    private static class TestSimpleVersionedSerializer
+            implements SimpleVersionedSerializer<LakeSplit> {
+
+        @Override
+        public byte[] serialize(LakeSplit split) throws IOException {
+            return TEST_DATA;
+        }
+
+        @Override
+        public LakeSplit deserialize(int version, byte[] serialized) throws 
IOException {
+            return LAKE_SPLIT;
+        }
+
+        @Override
+        public int getVersion() {
+            return SERIALIZER_VERSION;
+        }
+    }
+
+    private static class TestLakeSplit implements LakeSplit {
+
+        private int bucket;
+        private List<String> partition;
+
+        public TestLakeSplit(int bucket, List<String> partition) {
+            this.bucket = bucket;
+            this.partition = partition;
+        }
+
+        @Override
+        public String toString() {
+            return "TestLakeSplit";
+        }
+
+        @Override
+        public int bucket() {
+            return bucket;
+        }
+
+        @Override
+        public List<String> partition() {
+            return partition;
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java
new file mode 100644
index 000000000..dd6bc430d
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SortMergeReader}. */
+class SortMergeReaderTest {
+
+    private static class FlussRowComparator implements Comparator<InternalRow> 
{
+
+        private final int keyIndex;
+
+        public FlussRowComparator(int keyIndex) {
+            this.keyIndex = keyIndex;
+        }
+
+        @Override
+        public int compare(InternalRow o1, InternalRow o2) {
+            return o1.getInt(keyIndex) - o2.getInt(keyIndex);
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadBatch(boolean isProjected) {
+        int keyIndex = 0;
+        int[] pkIndexes = new int[] {keyIndex};
+        int[] projectedFields = isProjected ? new int[] {keyIndex, 1} : null;
+        List<LogRecord> logRecords1 = createRecords(0, 10, false);
+        List<LogRecord> logRecords2 = createRecords(10, 10, false);
+        List<LogRecord> logRecords3 = createRecords(20, 10, false);
+        List<KeyValueRow> logRecords4 =
+                createRecords(5, 20, true).stream()
+                        .map(logRecord -> new KeyValueRow(pkIndexes, 
logRecord.getRow(), false))
+                        .collect(Collectors.toList());
+
+        SortMergeReader sortMergeReader =
+                new SortMergeReader(
+                        projectedFields,
+                        new int[] {keyIndex},
+                        Arrays.asList(
+                                CloseableIterator.wrap(logRecords2.iterator()),
+                                CloseableIterator.wrap(logRecords3.iterator()),
+                                
CloseableIterator.wrap(logRecords1.iterator())),
+                        new FlussRowComparator(keyIndex),
+                        CloseableIterator.wrap(logRecords4.iterator()));
+
+        List<InternalRow> actualRows = new ArrayList<>();
+        InternalRow.FieldGetter[] fieldGetters =
+                InternalRow.createFieldGetters(
+                        isProjected
+                                ? RowType.of(new IntType(), new StringType())
+                                : RowType.of(new IntType(), new StringType(), 
new StringType()));
+        try (CloseableIterator<InternalRow> iterator = 
sortMergeReader.readBatch()) {
+            actualRows.addAll(materializeRows(iterator, fieldGetters));
+        }
+        assertThat(actualRows).hasSize(30);
+
+        List<LogRecord> expectedLogRecords = createRecords(0, 5, false);
+        expectedLogRecords.addAll(createRecords(5, 20, true));
+        expectedLogRecords.addAll(createRecords(25, 5, false));
+        ProjectedRow projectedRow = isProjected ? 
ProjectedRow.from(projectedFields) : null;
+        CloseableIterator<InternalRow> expected =
+                isProjected
+                        ? projected(
+                                
CloseableIterator.wrap(expectedLogRecords.iterator()), projectedRow)
+                        : CloseableIterator.wrap(
+                                
expectedLogRecords.stream().map(LogRecord::getRow).iterator());
+        assertThat(actualRows).isEqualTo(materializeRows(expected, 
fieldGetters));
+    }
+
+    private CloseableIterator<InternalRow> projected(
+            CloseableIterator<LogRecord> originElementIterator, final 
ProjectedRow projectedRow) {
+        return new CloseableIterator<InternalRow>() {
+            private final CloseableIterator<LogRecord> inner = 
originElementIterator;
+
+            @Override
+            public void close() {
+                inner.close();
+            }
+
+            @Override
+            public boolean hasNext() {
+                return inner.hasNext();
+            }
+
+            @Override
+            public InternalRow next() {
+                LogRecord element = inner.next();
+                return projectedRow.replaceRow(element.getRow());
+            }
+        };
+    }
+
+    private List<InternalRow> materializeRows(
+            CloseableIterator<InternalRow> iterator, InternalRow.FieldGetter[] 
fieldGetters) {
+        List<InternalRow> actualRows = new ArrayList<>();
+        while (iterator != null && iterator.hasNext()) {
+            InternalRow row = iterator.next();
+            GenericRow genericRow = new GenericRow(fieldGetters.length);
+            for (int i = 0; i < fieldGetters.length; i++) {
+                genericRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+            }
+            actualRows.add(genericRow);
+        }
+        return actualRows;
+    }
+
+    private List<LogRecord> createRecords(int startId, int count, boolean 
isLog) {
+        List<LogRecord> logRecords = new ArrayList<>();
+
+        for (int i = 0; i < count; i++) {
+            GenericRow row =
+                    row(
+                            startId + i,
+                            BinaryString.fromString(isLog ? "a" + "_updated" : 
"a"),
+                            BinaryString.fromString(isLog ? "A" + "_updated" : 
"A"));
+            logRecords.add(new ScanRecord(i, System.currentTimeMillis(), 
ChangeType.INSERT, row));
+        }
+        return logRecords;
+    }
+}

Reply via email to