This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a4857135 [flink] Remove legacy union read code for batch mode (#1563)
1a4857135 is described below

commit 1a48571354f450597b9dc17820999d7b41f614a8
Author: Junbo Wang <[email protected]>
AuthorDate: Tue Aug 19 10:29:00 2025 +0800

    [flink] Remove legacy union read code for batch mode (#1563)
---
 .../fluss/flink/lake/LakeRecordRecordEmitter.java  |  12 +-
 .../fluss/flink/lake/LakeSplitReaderGenerator.java |  84 +-----
 .../flink/lake/LakeSplitStateInitializer.java      |   6 +-
 .../fluss/flink/lakehouse/LakeSplitGenerator.java  | 311 --------------------
 .../fluss/flink/lakehouse/LakeSplitSerializer.java | 119 --------
 .../flink/lakehouse/paimon/reader/KeyValueRow.java |  47 ---
 .../lakehouse/paimon/reader/PaimonRowWrapper.java  | 117 --------
 .../reader/PaimonSnapshotAndLogSplitScanner.java   | 227 ---------------
 .../paimon/reader/PaimonSnapshotScanner.java       | 127 --------
 .../lakehouse/paimon/reader/ScanRecordWrapper.java | 180 ------------
 .../lakehouse/paimon/reader/SortMergeReader.java   | 322 ---------------------
 .../split/PaimonSnapshotAndFlussLogSplit.java      | 134 ---------
 .../split/PaimonSnapshotAndFlussLogSplitState.java |  45 ---
 .../paimon/split/PaimonSnapshotSplit.java          |  95 ------
 .../paimon/split/PaimonSnapshotSplitState.java     |  52 ----
 .../source/reader/FlinkSourceSplitReader.java      |   5 +-
 .../fluss/flink/lake/LakeSplitSerializerTest.java  |   3 +-
 fluss-test-coverage/pom.xml                        |   1 -
 18 files changed, 6 insertions(+), 1881 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
index 9e6848864..5b1946985 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -20,8 +20,6 @@ package com.alibaba.fluss.flink.lake;
 import com.alibaba.fluss.client.table.scanner.ScanRecord;
 import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
 import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
 import com.alibaba.fluss.flink.source.reader.RecordAndPos;
 import com.alibaba.fluss.flink.source.split.SourceSplitState;
 
@@ -42,15 +40,7 @@ public class LakeRecordRecordEmitter<OUT> {
             SourceSplitState splitState,
             SourceOutput<OUT> sourceOutput,
             RecordAndPos recordAndPos) {
-        if (splitState instanceof PaimonSnapshotSplitState) {
-            ((PaimonSnapshotSplitState) splitState)
-                    .setRecordsToSkip(recordAndPos.readRecordsCount());
-            sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
-        } else if (splitState instanceof PaimonSnapshotAndFlussLogSplitState) {
-            ((PaimonSnapshotAndFlussLogSplitState) splitState)
-                    .setRecordsToSkip(recordAndPos.readRecordsCount());
-            sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
-        } else if (splitState instanceof LakeSnapshotSplitState) {
+        if (splitState instanceof LakeSnapshotSplitState) {
             ((LakeSnapshotSplitState) 
splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
             sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
         } else if (splitState instanceof LakeSnapshotAndFlussLogSplitState) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
index 410bef73d..86a72f66c 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -22,58 +22,34 @@ import 
com.alibaba.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner;
 import com.alibaba.fluss.flink.lake.reader.LakeSnapshotScanner;
 import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
-import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
 import com.alibaba.fluss.flink.source.reader.BoundedSplitReader;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.flink.utils.DataLakeUtils;
 import com.alibaba.fluss.lake.source.LakeSource;
 import com.alibaba.fluss.lake.source.LakeSplit;
-import com.alibaba.fluss.metadata.TablePath;
-
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.ReadBuilder;
 
 import javax.annotation.Nullable;
 
 import java.util.Queue;
-import java.util.stream.IntStream;
 
 /** A generator to generate reader for lake split. */
 public class LakeSplitReaderGenerator {
 
     private final Table table;
 
-    private final TablePath tablePath;
-    private FileStoreTable fileStoreTable;
     private final @Nullable int[] projectedFields;
     private final @Nullable LakeSource<LakeSplit> lakeSource;
 
     public LakeSplitReaderGenerator(
             Table table,
-            TablePath tablePath,
             @Nullable int[] projectedFields,
             @Nullable LakeSource<LakeSplit> lakeSource) {
         this.table = table;
-        this.tablePath = tablePath;
         this.projectedFields = projectedFields;
         this.lakeSource = lakeSource;
     }
 
     public void addSplit(SourceSplitBase split, Queue<SourceSplitBase> 
boundedSplits) {
-        if (split instanceof PaimonSnapshotSplit) {
-            boundedSplits.add(split);
-        } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
-            boundedSplits.add(split);
-        } else if (split instanceof LakeSnapshotSplit) {
+        if (split instanceof LakeSnapshotSplit) {
             boundedSplits.add(split);
         } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
             boundedSplits.add(split);
@@ -84,32 +60,7 @@ public class LakeSplitReaderGenerator {
     }
 
     public BoundedSplitReader getBoundedSplitScanner(SourceSplitBase split) {
-        if (split instanceof PaimonSnapshotSplit) {
-            PaimonSnapshotSplit paimonSnapshotSplit = (PaimonSnapshotSplit) 
split;
-            FileStoreTable paimonStoreTable = getFileStoreTable();
-            int[] projectedFields = getProjectedFieldsForPaimonTable(table);
-            ReadBuilder readBuilder =
-                    
paimonStoreTable.newReadBuilder().withProjection(projectedFields);
-            PaimonSnapshotScanner paimonSnapshotScanner =
-                    new PaimonSnapshotScanner(
-                            readBuilder.newRead(), 
paimonSnapshotSplit.getFileStoreSourceSplit());
-            return new BoundedSplitReader(
-                    paimonSnapshotScanner,
-                    
paimonSnapshotSplit.getFileStoreSourceSplit().recordsToSkip());
-        } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
-            PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
-                    (PaimonSnapshotAndFlussLogSplit) split;
-            FileStoreTable paimonStoreTable = getFileStoreTable();
-            PaimonSnapshotAndLogSplitScanner paimonSnapshotAndLogSplitScanner =
-                    new PaimonSnapshotAndLogSplitScanner(
-                            table,
-                            paimonStoreTable,
-                            paimonSnapshotAndFlussLogSplit,
-                            projectedFields);
-            return new BoundedSplitReader(
-                    paimonSnapshotAndLogSplitScanner,
-                    paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
-        } else if (split instanceof LakeSnapshotSplit) {
+        if (split instanceof LakeSnapshotSplit) {
             LakeSnapshotSplit lakeSnapshotSplit = (LakeSnapshotSplit) split;
             LakeSnapshotScanner lakeSnapshotScanner =
                     new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
@@ -129,35 +80,4 @@ public class LakeSplitReaderGenerator {
                     String.format("The split type of %s is not supported.", 
split.getClass()));
         }
     }
-
-    private int[] getProjectedFieldsForPaimonTable(Table flussTable) {
-        return this.projectedFields != null
-                ? this.projectedFields
-                // only read the field in origin fluss table, not include 
log_offset, log_timestamp
-                // fields
-                : IntStream.range(0, 
flussTable.getTableInfo().getRowType().getFieldCount())
-                        .toArray();
-    }
-
-    private FileStoreTable getFileStoreTable() {
-        if (fileStoreTable != null) {
-            return fileStoreTable;
-        }
-
-        try (Catalog paimonCatalog =
-                FlinkCatalogFactory.createPaimonCatalog(
-                        Options.fromMap(
-                                DataLakeUtils.extractLakeCatalogProperties(
-                                        
table.getTableInfo().getProperties())))) {
-            fileStoreTable =
-                    (FileStoreTable)
-                            paimonCatalog.getTable(
-                                    Identifier.create(
-                                            tablePath.getDatabaseName(), 
tablePath.getTableName()));
-            return fileStoreTable;
-        } catch (Exception e) {
-            throw new FlinkRuntimeException(
-                    "Fail to get paimon table.", 
ExceptionUtils.stripExecutionException(e));
-        }
-    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
index a8779bce0..bac540257 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
@@ -21,8 +21,6 @@ import 
com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
 import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
 import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
 import com.alibaba.fluss.flink.source.split.SourceSplitBase;
 import com.alibaba.fluss.flink.source.split.SourceSplitState;
 
@@ -30,9 +28,7 @@ import com.alibaba.fluss.flink.source.split.SourceSplitState;
 public class LakeSplitStateInitializer {
 
     public static SourceSplitState initializedState(SourceSplitBase split) {
-        if (split instanceof PaimonSnapshotAndFlussLogSplit) {
-            return new 
PaimonSnapshotAndFlussLogSplitState((PaimonSnapshotAndFlussLogSplit) split);
-        } else if (split instanceof LakeSnapshotSplit) {
+        if (split instanceof LakeSnapshotSplit) {
             return new LakeSnapshotSplitState((LakeSnapshotSplit) split);
         } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
             return new 
LakeSnapshotAndFlussLogSplitState((LakeSnapshotAndFlussLogSplit) split);
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitGenerator.java
deleted file mode 100644
index 12d010f92..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitGenerator.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse;
-
-import com.alibaba.fluss.client.admin.Admin;
-import com.alibaba.fluss.client.metadata.LakeSnapshot;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
-import 
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
-import com.alibaba.fluss.flink.source.split.LogSplit;
-import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.metadata.PartitionInfo;
-import com.alibaba.fluss.metadata.TableBucket;
-import com.alibaba.fluss.metadata.TableInfo;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.InnerTableScan;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static 
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
-import static 
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
-import static com.alibaba.fluss.utils.Preconditions.checkState;
-
-/**
- * A generator for lake splits.
- *
- * <p>todo: current is always assume is paimon, make it pluggable.
- */
-public class LakeSplitGenerator {
-
-    private final TableInfo tableInfo;
-    private final Admin flussAdmin;
-    private final OffsetsInitializer.BucketOffsetsRetriever 
bucketOffsetsRetriever;
-    private final OffsetsInitializer stoppingOffsetInitializer;
-    private final int bucketCount;
-
-    public LakeSplitGenerator(
-            TableInfo tableInfo,
-            Admin flussAdmin,
-            OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
-            OffsetsInitializer stoppingOffsetInitializer,
-            int bucketCount) {
-        this.tableInfo = tableInfo;
-        this.flussAdmin = flussAdmin;
-        this.bucketOffsetsRetriever = bucketOffsetsRetriever;
-        this.stoppingOffsetInitializer = stoppingOffsetInitializer;
-        this.bucketCount = bucketCount;
-    }
-
-    public List<SourceSplitBase> generateLakeSplits() throws Exception {
-        // get the file store
-        LakeSnapshot lakeSnapshotInfo =
-                
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
-        FileStoreTable fileStoreTable =
-                getTable(
-                        lakeSnapshotInfo.getSnapshotId(),
-                        
extractLakeCatalogProperties(tableInfo.getProperties()));
-        boolean isLogTable = fileStoreTable.schema().primaryKeys().isEmpty();
-        boolean isPartitioned = 
!fileStoreTable.schema().partitionKeys().isEmpty();
-
-        if (isPartitioned) {
-            List<PartitionInfo> partitionInfos =
-                    
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
-            Map<Long, String> partitionNameById =
-                    partitionInfos.stream()
-                            .collect(
-                                    Collectors.toMap(
-                                            PartitionInfo::getPartitionId,
-                                            PartitionInfo::getPartitionName));
-            return generatePartitionTableSplit(
-                    isLogTable,
-                    fileStoreTable,
-                    lakeSnapshotInfo.getTableBucketsOffset(),
-                    partitionNameById);
-        } else {
-            // non-partitioned table
-            return generateNoPartitionedTableSplit(
-                    isLogTable, fileStoreTable, 
lakeSnapshotInfo.getTableBucketsOffset());
-        }
-    }
-
-    private List<SourceSplitBase> generatePartitionTableSplit(
-            boolean isLogTable,
-            FileStoreTable fileStoreTable,
-            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
-            Map<Long, String> partitionNameById) {
-        List<SourceSplitBase> splits = new ArrayList<>();
-        for (Map.Entry<Long, String> partitionNameByIdEntry : 
partitionNameById.entrySet()) {
-            long partitionId = partitionNameByIdEntry.getKey();
-            String partitionName = partitionNameByIdEntry.getValue();
-            Map<Integer, Long> bucketEndOffset =
-                    stoppingOffsetInitializer.getBucketOffsets(
-                            partitionName,
-                            IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
-                            bucketOffsetsRetriever);
-            splits.addAll(
-                    generateSplit(
-                            fileStoreTable,
-                            partitionId,
-                            partitionName,
-                            isLogTable,
-                            tableBucketSnapshotLogOffset,
-                            bucketEndOffset));
-        }
-        return splits;
-    }
-
-    private List<SourceSplitBase> generateSplit(
-            FileStoreTable fileStoreTable,
-            @Nullable Long partitionId,
-            @Nullable String partitionName,
-            boolean isLogTable,
-            Map<TableBucket, Long> tableBucketSnapshotLogOffset,
-            Map<Integer, Long> bucketEndOffset) {
-        List<SourceSplitBase> splits = new ArrayList<>();
-        FileStoreSourceSplitGenerator splitGenerator = new 
FileStoreSourceSplitGenerator();
-        if (isLogTable) {
-            // it's log table, we don't care about bucket, and we can't get 
bucket in paimon's
-            // dynamic bucket; so first generate split for the whole paimon 
snapshot,
-            // then generate log split for each bucket paimon snapshot + fluss 
log
-            splits.addAll(
-                    generateSplitForLogSnapshot(
-                            fileStoreTable, splitGenerator, partitionId, 
partitionName));
-            for (int bucket = 0; bucket < bucketCount; bucket++) {
-                TableBucket tableBucket =
-                        new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
-                Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
-                long stoppingOffset = bucketEndOffset.get(bucket);
-                if (snapshotLogOffset == null) {
-                    // no any data commit to this bucket, scan from fluss log
-                    splits.add(
-                            new LogSplit(
-                                    tableBucket, partitionName, 
EARLIEST_OFFSET, stoppingOffset));
-                } else {
-                    // need to read remain fluss log
-                    if (snapshotLogOffset < stoppingOffset) {
-                        splits.add(
-                                new LogSplit(
-                                        tableBucket,
-                                        partitionName,
-                                        snapshotLogOffset,
-                                        stoppingOffset));
-                    }
-                }
-            }
-        } else {
-            // it's primary key table
-            for (int bucket = 0; bucket < bucketCount; bucket++) {
-                TableBucket tableBucket =
-                        new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
-                Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
-                long stoppingOffset = bucketEndOffset.get(bucket);
-                splits.add(
-                        generateSplitForPrimaryKeyTableBucket(
-                                fileStoreTable,
-                                splitGenerator,
-                                tableBucket,
-                                partitionName,
-                                snapshotLogOffset,
-                                stoppingOffset));
-            }
-        }
-
-        return splits;
-    }
-
-    private List<SourceSplitBase> generateSplitForLogSnapshot(
-            FileStoreTable fileStoreTable,
-            FileStoreSourceSplitGenerator splitGenerator,
-            @Nullable Long partitionId,
-            @Nullable String partitionName) {
-        List<SourceSplitBase> splits = new ArrayList<>();
-        // paimon snapshot
-        InnerTableScan scan = fileStoreTable.newScan();
-        if (partitionName != null) {
-            scan = scan.withPartitionFilter(getPartitionSpec(fileStoreTable, 
partitionName));
-        }
-        // for snapshot splits, we always use bucket = -1 ad the bucket since 
we can't get bucket in
-        // paimon's log table
-        TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), 
partitionId, -1);
-        // snapshot splits + one log split
-        for (FileStoreSourceSplit fileStoreSourceSplit : 
splitGenerator.createSplits(scan.plan())) {
-            splits.add(new PaimonSnapshotSplit(tableBucket, partitionName, 
fileStoreSourceSplit));
-        }
-        return splits;
-    }
-
-    private Map<String, String> getPartitionSpec(
-            FileStoreTable fileStoreTable, String partitionName) {
-        List<String> partitionKeys = fileStoreTable.partitionKeys();
-        checkState(
-                partitionKeys.size() == 1,
-                "Must only one partition key for paimon table %, but got %s, 
the partition keys are: ",
-                tableInfo.getTablePath(),
-                partitionKeys.size(),
-                partitionKeys);
-        return Collections.singletonMap(partitionKeys.get(0), partitionName);
-    }
-
-    private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
-            FileStoreTable fileStoreTable,
-            FileStoreSourceSplitGenerator splitGenerator,
-            TableBucket tableBucket,
-            @Nullable String partitionName,
-            @Nullable Long snapshotLogOffset,
-            long stoppingOffset) {
-
-        // no snapshot data for this bucket or no a corresponding log offset 
in this bucket,
-        // can only scan from change log
-        if (snapshotLogOffset == null || snapshotLogOffset < 0) {
-            return new PaimonSnapshotAndFlussLogSplit(
-                    tableBucket, partitionName, null, EARLIEST_OFFSET, 
stoppingOffset);
-        }
-
-        // then, generate a split contains
-        // snapshot and change log so that we can merge change log and snapshot
-        // to get the full data
-        fileStoreTable =
-                fileStoreTable.copy(
-                        Collections.singletonMap(
-                                CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(),
-                                // we set a max size to make sure only one 
splits
-                                MemorySize.MAX_VALUE.toString()));
-        InnerTableScan tableScan =
-                fileStoreTable.newScan().withBucketFilter((b) -> b == 
tableBucket.getBucket());
-
-        if (partitionName != null) {
-            tableScan =
-                    
tableScan.withPartitionFilter(getPartitionSpec(fileStoreTable, partitionName));
-        }
-
-        List<FileStoreSourceSplit> fileStoreSourceSplits =
-                splitGenerator.createSplits(tableScan.plan());
-
-        checkState(fileStoreSourceSplits.size() == 1, "Splits for primary key 
table must be 1.");
-        FileStoreSourceSplit fileStoreSourceSplit = 
fileStoreSourceSplits.get(0);
-        return new PaimonSnapshotAndFlussLogSplit(
-                tableBucket,
-                partitionName,
-                fileStoreSourceSplit,
-                snapshotLogOffset,
-                stoppingOffset);
-    }
-
-    private List<SourceSplitBase> generateNoPartitionedTableSplit(
-            boolean isLogTable,
-            FileStoreTable fileStoreTable,
-            Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
-        // iterate all bucket
-        // assume bucket is from 0 to bucket count
-        Map<Integer, Long> bucketEndOffset =
-                stoppingOffsetInitializer.getBucketOffsets(
-                        null,
-                        IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList()),
-                        bucketOffsetsRetriever);
-        return generateSplit(
-                fileStoreTable,
-                null,
-                null,
-                isLogTable,
-                tableBucketSnapshotLogOffset,
-                bucketEndOffset);
-    }
-
-    private FileStoreTable getTable(long snapshotId, Map<String, String> 
catalogProperties)
-            throws Exception {
-        try (Catalog catalog =
-                
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(catalogProperties))) {
-            return (FileStoreTable)
-                    catalog.getTable(
-                                    Identifier.create(
-                                            
tableInfo.getTablePath().getDatabaseName(),
-                                            
tableInfo.getTablePath().getTableName()))
-                            .copy(
-                                    Collections.singletonMap(
-                                            CoreOptions.SCAN_SNAPSHOT_ID.key(),
-                                            String.valueOf(snapshotId)));
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitSerializer.java
deleted file mode 100644
index 76c4e4452..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitSerializer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse;
-
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
-import com.alibaba.fluss.flink.source.split.LogSplit;
-import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.metadata.TableBucket;
-
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.flink.source.FileStoreSourceSplitSerializer;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-
-import static 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit.PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
-import static 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit.PAIMON_SNAPSHOT_SPLIT_KIND;
-
-/** A serializer for lake split. */
-public class LakeSplitSerializer {
-
-    private final FileStoreSourceSplitSerializer 
fileStoreSourceSplitSerializer =
-            new FileStoreSourceSplitSerializer();
-
-    public void serialize(DataOutputSerializer out, SourceSplitBase split) 
throws IOException {
-        FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
-                new FileStoreSourceSplitSerializer();
-        if (split instanceof PaimonSnapshotSplit) {
-            FileStoreSourceSplit fileStoreSourceSplit =
-                    ((PaimonSnapshotSplit) split).getFileStoreSourceSplit();
-            byte[] serializeBytes = 
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
-            out.writeInt(serializeBytes.length);
-            out.write(serializeBytes);
-        } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
-            // writing file store source split
-            PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
-                    ((PaimonSnapshotAndFlussLogSplit) split);
-            FileStoreSourceSplit fileStoreSourceSplit =
-                    paimonSnapshotAndFlussLogSplit.getSnapshotSplit();
-            if (fileStoreSourceSplit == null) {
-                // no snapshot data for the bucket
-                out.writeBoolean(false);
-            } else {
-                out.writeBoolean(true);
-                byte[] serializeBytes =
-                        
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
-                out.writeInt(serializeBytes.length);
-                out.write(serializeBytes);
-            }
-            // writing starting/stopping offset
-            out.writeLong(paimonSnapshotAndFlussLogSplit.getStartingOffset());
-            out.writeLong(
-                    paimonSnapshotAndFlussLogSplit
-                            .getStoppingOffset()
-                            .orElse(LogSplit.NO_STOPPING_OFFSET));
-            out.writeLong(paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
-        } else {
-            throw new UnsupportedOperationException(
-                    "Unsupported split type: " + split.getClass().getName());
-        }
-    }
-
-    public SourceSplitBase deserialize(
-            byte splitKind,
-            TableBucket tableBucket,
-            @Nullable String partition,
-            DataInputDeserializer input)
-            throws IOException {
-        if (splitKind == PAIMON_SNAPSHOT_SPLIT_KIND) {
-            byte[] serializeBytes = new byte[input.readInt()];
-            input.read(serializeBytes);
-            FileStoreSourceSplit fileStoreSourceSplit =
-                    fileStoreSourceSplitSerializer.deserialize(
-                            fileStoreSourceSplitSerializer.getVersion(), 
serializeBytes);
-
-            return new PaimonSnapshotSplit(tableBucket, partition, 
fileStoreSourceSplit);
-        } else if (splitKind == PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
-            FileStoreSourceSplit fileStoreSourceSplit = null;
-            if (input.readBoolean()) {
-                byte[] serializeBytes = new byte[input.readInt()];
-                input.read(serializeBytes);
-                fileStoreSourceSplit =
-                        fileStoreSourceSplitSerializer.deserialize(
-                                fileStoreSourceSplitSerializer.getVersion(), 
serializeBytes);
-            }
-            long startingOffset = input.readLong();
-            long stoppingOffset = input.readLong();
-            long recordsToSkip = input.readLong();
-            return new PaimonSnapshotAndFlussLogSplit(
-                    tableBucket,
-                    partition,
-                    fileStoreSourceSplit,
-                    startingOffset,
-                    stoppingOffset,
-                    recordsToSkip);
-        } else {
-            throw new UnsupportedOperationException("Unsupported split kind: " 
+ splitKind);
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/KeyValueRow.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/KeyValueRow.java
deleted file mode 100644
index 86a7f74f8..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/KeyValueRow.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.utils.ProjectedRow;
-
-/** An {@link InternalRow} with the key part. */
-public class KeyValueRow {
-
-    private final boolean isDelete;
-    private final ProjectedRow pkRow;
-    private final InternalRow valueRow;
-
-    public KeyValueRow(int[] indexes, InternalRow valueRow, boolean isDelete) {
-        this.pkRow = ProjectedRow.from(indexes).replaceRow(valueRow);
-        this.valueRow = valueRow;
-        this.isDelete = isDelete;
-    }
-
-    public boolean isDelete() {
-        return isDelete;
-    }
-
-    public InternalRow keyRow() {
-        return pkRow;
-    }
-
-    public InternalRow valueRow() {
-        return valueRow;
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonRowWrapper.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonRowWrapper.java
deleted file mode 100644
index ad508adc7..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonRowWrapper.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.row.BinaryString;
-import com.alibaba.fluss.row.Decimal;
-import com.alibaba.fluss.row.InternalRow;
-import com.alibaba.fluss.row.TimestampLtz;
-import com.alibaba.fluss.row.TimestampNtz;
-
-/**
- * A wrapper for Paimon's InternalRow to bridge the Paimon's InternalRow to 
Fluss's InternalRow. .
- */
-public class PaimonRowWrapper implements InternalRow {
-    private final org.apache.paimon.data.InternalRow paimonRow;
-
-    public PaimonRowWrapper(org.apache.paimon.data.InternalRow paimonRow) {
-        this.paimonRow = paimonRow;
-    }
-
-    @Override
-    public int getFieldCount() {
-        return paimonRow.getFieldCount();
-    }
-
-    @Override
-    public boolean isNullAt(int pos) {
-        return paimonRow.isNullAt(pos);
-    }
-
-    @Override
-    public boolean getBoolean(int pos) {
-        return paimonRow.getBoolean(pos);
-    }
-
-    @Override
-    public byte getByte(int pos) {
-        return paimonRow.getByte(pos);
-    }
-
-    @Override
-    public short getShort(int pos) {
-        return paimonRow.getShort(pos);
-    }
-
-    @Override
-    public int getInt(int pos) {
-        return paimonRow.getInt(pos);
-    }
-
-    @Override
-    public long getLong(int pos) {
-        return paimonRow.getLong(pos);
-    }
-
-    @Override
-    public float getFloat(int pos) {
-        return paimonRow.getFloat(pos);
-    }
-
-    @Override
-    public double getDouble(int pos) {
-        return paimonRow.getDouble(pos);
-    }
-
-    @Override
-    public BinaryString getChar(int pos, int length) {
-        return BinaryString.fromBytes(paimonRow.getString(pos).toBytes());
-    }
-
-    @Override
-    public BinaryString getString(int pos) {
-        return BinaryString.fromBytes(paimonRow.getString(pos).toBytes());
-    }
-
-    @Override
-    public Decimal getDecimal(int pos, int precision, int scale) {
-        return Decimal.fromBigDecimal(
-                paimonRow.getDecimal(pos, precision, scale).toBigDecimal(), 
precision, scale);
-    }
-
-    @Override
-    public TimestampNtz getTimestampNtz(int pos, int precision) {
-        return TimestampNtz.fromLocalDateTime(
-                paimonRow.getTimestamp(pos, precision).toLocalDateTime());
-    }
-
-    @Override
-    public TimestampLtz getTimestampLtz(int pos, int precision) {
-        return TimestampLtz.fromInstant(paimonRow.getTimestamp(pos, 
precision).toInstant());
-    }
-
-    @Override
-    public byte[] getBinary(int pos, int length) {
-        return paimonRow.getBinary(pos);
-    }
-
-    @Override
-    public byte[] getBytes(int pos) {
-        return paimonRow.getBinary(pos);
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotAndLogSplitScanner.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotAndLogSplitScanner.java
deleted file mode 100644
index 95b66c33f..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotAndLogSplitScanner.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.client.table.Table;
-import com.alibaba.fluss.client.table.scanner.ScanRecord;
-import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
-import com.alibaba.fluss.client.table.scanner.log.LogScanner;
-import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
-import 
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import com.alibaba.fluss.metadata.TableBucket;
-import com.alibaba.fluss.record.ChangeType;
-import com.alibaba.fluss.utils.CloseableIterator;
-
-import org.apache.paimon.KeyValueFileStore;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.reader.EmptyRecordReader;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.RowType;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-/** A scanner to merge the paimon's snapshot and change log. */
-public class PaimonSnapshotAndLogSplitScanner implements BatchScanner {
-
-    private final TableRead tableRead;
-    private final PaimonSnapshotAndFlussLogSplit snapshotAndFlussLogSplit;
-    // the origin indexes of primary key in origin table
-    private final int[] pkIndexes;
-
-    // the indexes of primary key in emitted row by paimon and fluss
-    private int[] keyIndexesInRow;
-    private final Comparator<InternalRow> keyComparator;
-
-    // the sorted logs in memory, mapping from key -> value
-    private final SortedMap<InternalRow, KeyValueRow> logRows;
-
-    private final LogScanner logScanner;
-    private final long stoppingOffset;
-    private final RowType paimonRowType;
-
-    private boolean logScanFinished;
-    private SortMergeReader currentSortMergeReader;
-    private RecordReader<InternalRow> snapshotRecordReader;
-    @Nullable private int[] adjustProjectedFields;
-
-    public PaimonSnapshotAndLogSplitScanner(
-            Table flussTable,
-            FileStoreTable fileStoreTable,
-            PaimonSnapshotAndFlussLogSplit snapshotAndFlussLogSplit,
-            @Nullable int[] projectedFields) {
-        this.pkIndexes = 
flussTable.getTableInfo().getSchema().getPrimaryKeyIndexes();
-        this.paimonRowType = fileStoreTable.rowType();
-        int[] newProjectedFields = getNeedProjectFields(flussTable, 
projectedFields);
-        this.tableRead =
-                
fileStoreTable.newReadBuilder().withProjection(newProjectedFields).newRead();
-        this.snapshotAndFlussLogSplit = snapshotAndFlussLogSplit;
-        this.keyComparator = ((KeyValueFileStore) 
fileStoreTable.store()).newKeyComparator();
-        this.logRows = new TreeMap<>(keyComparator);
-        this.logScanner = 
flussTable.newScan().project(newProjectedFields).createLogScanner();
-
-        TableBucket tableBucket = snapshotAndFlussLogSplit.getTableBucket();
-        if (tableBucket.getPartitionId() != null) {
-            this.logScanner.subscribe(
-                    tableBucket.getPartitionId(),
-                    tableBucket.getBucket(),
-                    snapshotAndFlussLogSplit.getStartingOffset());
-        } else {
-            this.logScanner.subscribe(
-                    tableBucket.getBucket(), 
snapshotAndFlussLogSplit.getStartingOffset());
-        }
-
-        this.stoppingOffset =
-                snapshotAndFlussLogSplit
-                        .getStoppingOffset()
-                        .orElseThrow(
-                                () ->
-                                        new RuntimeException(
-                                                "StoppingOffset is null for 
split: "
-                                                        + 
snapshotAndFlussLogSplit));
-
-        // starting offset is greater than or equal to stoppingOffset, no any 
log need to scan
-        this.logScanFinished = snapshotAndFlussLogSplit.getStartingOffset() >= 
stoppingOffset;
-    }
-
-    @Override
-    @Nullable
-    public CloseableIterator<com.alibaba.fluss.row.InternalRow> 
pollBatch(Duration poolTimeOut)
-            throws IOException {
-        if (logScanFinished) {
-            if (currentSortMergeReader == null) {
-                currentSortMergeReader = createSortMergeReader();
-            }
-            return currentSortMergeReader.readBatch();
-        } else {
-            pollLogRecords(poolTimeOut);
-            return CloseableIterator.wrap(Collections.emptyIterator());
-        }
-    }
-
-    private int[] getNeedProjectFields(Table flussTable, @Nullable int[] 
originProjectedFields) {
-        if (originProjectedFields != null) {
-            // we need to include the primary key in projected fields to sort 
merge by pk
-            // if the provided don't include, we need to include it
-            List<Integer> newProjectedFields =
-                    
Arrays.stream(originProjectedFields).boxed().collect(Collectors.toList());
-
-            // the indexes of primary key with new projected fields
-            keyIndexesInRow = new int[pkIndexes.length];
-            for (int i = 0; i < pkIndexes.length; i++) {
-                int primaryKeyIndex = pkIndexes[i];
-                // search the pk in projected fields
-                int indexInProjectedFields = findIndex(originProjectedFields, 
primaryKeyIndex);
-                if (indexInProjectedFields >= 0) {
-                    keyIndexesInRow[i] = indexInProjectedFields;
-                } else {
-                    // no pk in projected fields, we must include it to do
-                    // merge sort
-                    newProjectedFields.add(primaryKeyIndex);
-                    keyIndexesInRow[i] = newProjectedFields.size() - 1;
-                }
-            }
-            int[] newProjection = 
newProjectedFields.stream().mapToInt(Integer::intValue).toArray();
-            // the underlying scan will use the new projection to scan data,
-            // but will still need to map from the new projection to the 
origin projected fields
-            int[] adjustProjectedFields = new 
int[originProjectedFields.length];
-            for (int i = 0; i < originProjectedFields.length; i++) {
-                adjustProjectedFields[i] = findIndex(newProjection, 
originProjectedFields[i]);
-            }
-            this.adjustProjectedFields = adjustProjectedFields;
-            return newProjection;
-        } else {
-            // no projectedFields, use all fields
-            keyIndexesInRow = pkIndexes;
-            return IntStream.range(0, 
flussTable.getTableInfo().getRowType().getFieldCount())
-                    .toArray();
-        }
-    }
-
-    private int findIndex(int[] array, int target) {
-        int index = -1;
-        for (int i = 0; i < array.length; i++) {
-            if (array[i] == target) {
-                index = i;
-                break;
-            }
-        }
-        return index;
-    }
-
-    private SortMergeReader createSortMergeReader() throws IOException {
-        FileStoreSourceSplit fileStoreSourceSplit = 
snapshotAndFlussLogSplit.getSnapshotSplit();
-        snapshotRecordReader =
-                fileStoreSourceSplit == null
-                        ? new EmptyRecordReader<>()
-                        : tableRead.createReader(fileStoreSourceSplit.split());
-        return new SortMergeReader(
-                adjustProjectedFields,
-                keyIndexesInRow,
-                snapshotRecordReader,
-                CloseableIterator.wrap(logRows.values().iterator()),
-                keyComparator);
-    }
-
-    private void pollLogRecords(Duration timeout) {
-        ScanRecords scanRecords = logScanner.poll(timeout);
-        for (ScanRecord scanRecord : scanRecords) {
-            InternalRow paimonRow = new ScanRecordWrapper(scanRecord, 
paimonRowType);
-            boolean isDelete =
-                    scanRecord.getChangeType() == ChangeType.DELETE
-                            || scanRecord.getChangeType() == 
ChangeType.UPDATE_BEFORE;
-            KeyValueRow keyValueRow = new KeyValueRow(keyIndexesInRow, 
paimonRow, isDelete);
-            InternalRow keyRow = keyValueRow.keyRow();
-            // upsert the key value row
-            logRows.put(keyRow, keyValueRow);
-            if (scanRecord.logOffset() >= stoppingOffset - 1) {
-                // has reached to the end
-                logScanFinished = true;
-                break;
-            }
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        try {
-            if (logScanner != null) {
-                logScanner.close();
-            }
-            if (snapshotRecordReader != null) {
-                snapshotRecordReader.close();
-                snapshotRecordReader = null;
-            }
-        } catch (Exception e) {
-            throw new IOException("Failed to close resources", e);
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotScanner.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotScanner.java
deleted file mode 100644
index 50a6c2a74..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotScanner.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
-import com.alibaba.fluss.utils.CloseableIterator;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Objects;
-
-/* This file is based on source code of Apache Paimon Project 
(https://paimon.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
- * additional information regarding copyright ownership. */
-
-/** A scanner for reading paimon split. Most logic is copied from paimon. */
-public class PaimonSnapshotScanner implements BatchScanner {
-
-    private final TableRead tableRead;
-    @Nullable private LazyRecordReader currentReader;
-
-    public PaimonSnapshotScanner(TableRead tableRead, FileStoreSourceSplit 
fileStoreSourceSplit) {
-        this.tableRead = tableRead;
-        this.currentReader = new 
LazyRecordReader(fileStoreSourceSplit.split());
-    }
-
-    @Override
-    @Nullable
-    public CloseableIterator<com.alibaba.fluss.row.InternalRow> 
pollBatch(Duration timeout) {
-        try {
-            RecordReader.RecordIterator<InternalRow> nextBatch =
-                    
Objects.requireNonNull(currentReader).recordReader().readBatch();
-            if (nextBatch == null) {
-                return null;
-            } else {
-                return new PaimonRowIteratorWrapper(nextBatch);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void close() throws IOException {
-        LazyRecordReader recordReader = currentReader;
-        if (recordReader != null) {
-            if (recordReader.lazyRecordReader != null) {
-                recordReader.lazyRecordReader.close();
-            }
-            currentReader = null;
-        }
-    }
-
-    private static class PaimonRowIteratorWrapper
-            implements CloseableIterator<com.alibaba.fluss.row.InternalRow> {
-        private final RecordReader.RecordIterator<InternalRow> recordBatch;
-        private @Nullable InternalRow paimonRow;
-
-        public 
PaimonRowIteratorWrapper(RecordReader.RecordIterator<InternalRow> recordBatch) {
-            this.recordBatch = recordBatch;
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (paimonRow != null) {
-                return true;
-            }
-            try {
-                paimonRow = recordBatch.next();
-                return paimonRow != null;
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public com.alibaba.fluss.row.InternalRow next() {
-            PaimonRowWrapper wrapper = new PaimonRowWrapper(paimonRow);
-            paimonRow = null;
-            return wrapper;
-        }
-
-        @Override
-        public void close() {
-            recordBatch.releaseBatch();
-        }
-    }
-
-    /** Lazy to create {@link RecordReader} to improve performance for limit. 
*/
-    private class LazyRecordReader {
-        private final Split split;
-        private RecordReader<InternalRow> lazyRecordReader;
-
-        private LazyRecordReader(Split split) {
-            this.split = split;
-        }
-
-        public RecordReader<InternalRow> recordReader() throws IOException {
-            if (lazyRecordReader == null) {
-                lazyRecordReader = tableRead.createReader(split);
-            }
-            return lazyRecordReader;
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
deleted file mode 100644
index 47e844503..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.client.table.scanner.ScanRecord;
-import com.alibaba.fluss.record.ChangeType;
-import com.alibaba.fluss.row.TimestampLtz;
-import com.alibaba.fluss.row.TimestampNtz;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.variant.Variant;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-
-/** A wrapper of {@link ScanRecord} which bridges {@link ScanRecord} to 
Paimon' internal row. */
-public class ScanRecordWrapper implements InternalRow {
-
-    private final ChangeType changeType;
-    private final com.alibaba.fluss.row.InternalRow flussRow;
-    private final RowType rowType;
-
-    public ScanRecordWrapper(ScanRecord scanRecord, RowType rowType) {
-        this.changeType = scanRecord.getChangeType();
-        this.flussRow = scanRecord.getRow();
-        this.rowType = rowType;
-    }
-
-    @Override
-    public int getFieldCount() {
-        return flussRow.getFieldCount();
-    }
-
-    @Override
-    public RowKind getRowKind() {
-        switch (changeType) {
-            case INSERT:
-                return RowKind.INSERT;
-            case UPDATE_BEFORE:
-                return RowKind.UPDATE_BEFORE;
-            case UPDATE_AFTER:
-                return RowKind.UPDATE_AFTER;
-            case DELETE:
-                return RowKind.DELETE;
-            default:
-                throw new UnsupportedOperationException();
-        }
-    }
-
-    @Override
-    public void setRowKind(RowKind rowKind) {
-        // do nothing
-    }
-
-    @Override
-    public boolean isNullAt(int pos) {
-        return flussRow.isNullAt(pos);
-    }
-
-    @Override
-    public boolean getBoolean(int pos) {
-        return flussRow.getBoolean(pos);
-    }
-
-    @Override
-    public byte getByte(int pos) {
-        return flussRow.getByte(pos);
-    }
-
-    @Override
-    public short getShort(int pos) {
-        return flussRow.getShort(pos);
-    }
-
-    @Override
-    public int getInt(int pos) {
-        return flussRow.getInt(pos);
-    }
-
-    @Override
-    public long getLong(int pos) {
-        return flussRow.getLong(pos);
-    }
-
-    @Override
-    public float getFloat(int pos) {
-        return flussRow.getFloat(pos);
-    }
-
-    @Override
-    public double getDouble(int pos) {
-        return flussRow.getDouble(pos);
-    }
-
-    @Override
-    public BinaryString getString(int pos) {
-        return BinaryString.fromString(flussRow.getString(pos).toString());
-    }
-
-    @Override
-    public Decimal getDecimal(int pos, int precision, int scale) {
-        return Decimal.fromBigDecimal(
-                flussRow.getDecimal(pos, precision, scale).toBigDecimal(), 
precision, scale);
-    }
-
-    @Override
-    public Timestamp getTimestamp(int pos, int precision) {
-        DataType paimonTimestampType = rowType.getTypeAt(pos);
-        switch (paimonTimestampType.getTypeRoot()) {
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-                if (TimestampNtz.isCompact(precision)) {
-                    return Timestamp.fromEpochMillis(
-                            flussRow.getTimestampNtz(pos, 
precision).getMillisecond());
-                } else {
-                    TimestampNtz timestampNtz = flussRow.getTimestampNtz(pos, 
precision);
-                    return Timestamp.fromEpochMillis(
-                            timestampNtz.getMillisecond(), 
timestampNtz.getNanoOfMillisecond());
-                }
-
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                if (TimestampLtz.isCompact(precision)) {
-                    return Timestamp.fromEpochMillis(
-                            flussRow.getTimestampLtz(pos, 
precision).getEpochMillisecond());
-                } else {
-                    TimestampLtz timestampLtz = flussRow.getTimestampLtz(pos, 
precision);
-                    return Timestamp.fromEpochMillis(
-                            timestampLtz.getEpochMillisecond(),
-                            timestampLtz.getNanoOfMillisecond());
-                }
-            default:
-                throw new UnsupportedOperationException(
-                        "Unsupported data type to get timestamp: " + 
paimonTimestampType);
-        }
-    }
-
-    @Override
-    public byte[] getBinary(int pos) {
-        return flussRow.getBytes(pos);
-    }
-
-    @Override
-    public Variant getVariant(int pos) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public InternalArray getArray(int pos) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public InternalMap getMap(int pos) {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public InternalRow getRow(int pos, int pos1) {
-        throw new UnsupportedOperationException();
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/SortMergeReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/SortMergeReader.java
deleted file mode 100644
index ebe8f0977..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/SortMergeReader.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.utils.CloseableIterator;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.utils.ProjectedRow;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-
-/** A sort merge reader to merge paimon snapshot and fluss change log. */
-public class SortMergeReader {
-
-    // to project to pk row
-    private final ProjectedRow snapshotProjectedPkRow;
-    private final RecordReader<InternalRow> paimonReader;
-    private final Comparator<InternalRow> userKeyComparator;
-
-    private final SnapshotMergedRowIteratorWrapper 
snapshotMergedRowIteratorWrapper;
-    private final ChangeLogIteratorWrapper changeLogIteratorWrapper;
-    private @Nullable final ProjectedRow projectedRow;
-
-    private CloseableIterator<KeyValueRow> changeLogIterator;
-
-    SortMergeReader(
-            // origin projected fields
-            @Nullable int[] projectedFields,
-            // the pk index in paimon row
-            int[] pkIndexes,
-            RecordReader<InternalRow> paimonReader,
-            CloseableIterator<KeyValueRow> changeLogIterator,
-            Comparator<InternalRow> userKeyComparator) {
-        this.paimonReader = paimonReader;
-        this.changeLogIterator = changeLogIterator;
-        this.userKeyComparator = userKeyComparator;
-        this.snapshotProjectedPkRow = ProjectedRow.from(pkIndexes);
-
-        this.snapshotMergedRowIteratorWrapper = new 
SnapshotMergedRowIteratorWrapper();
-        this.changeLogIteratorWrapper = new ChangeLogIteratorWrapper();
-
-        // to project to fields provided by user
-        this.projectedRow = projectedFields == null ? null : 
ProjectedRow.from(projectedFields);
-    }
-
-    @Nullable
-    public 
com.alibaba.fluss.utils.CloseableIterator<com.alibaba.fluss.row.InternalRow> 
readBatch()
-            throws IOException {
-        RecordReader.RecordIterator<InternalRow> nextBatch = 
paimonReader.readBatch();
-        // no any snapshot record, now, read log
-        if (nextBatch == null) {
-            return changeLogIterator.hasNext()
-                    //  wrap to scan record iterator
-                    ? changeLogIteratorWrapper.replace(changeLogIterator)
-                    : null;
-        } else {
-            RecordReader.RecordIterator<SortMergeRows> mergedRecordIterator =
-                    nextBatch.transform(this::sortMergeWithChangeLog);
-            // wrap to snapshot merged row
-            return 
snapshotMergedRowIteratorWrapper.replace(mergedRecordIterator);
-        }
-    }
-
-    /**
-     * The IteratorWrapper to wrap Paimon's RecordReader.RecordIterator which 
emit the merged rows
-     * with paimon snapshot and fluss change log.
-     */
-    private class SnapshotMergedRowIteratorWrapper
-            implements CloseableIterator<com.alibaba.fluss.row.InternalRow> {
-        private RecordReader.RecordIterator<SortMergeRows> currentBatch;
-
-        // the merged row after advance currentBatch once
-        private @Nullable Iterator<InternalRow> currentMergedRows;
-
-        // the row to be returned
-        private @Nullable InternalRow returnedRow;
-
-        public SnapshotMergedRowIteratorWrapper replace(
-                RecordReader.RecordIterator<SortMergeRows> currentBatch) {
-            this.currentBatch = currentBatch;
-            this.returnedRow = null;
-            this.currentMergedRows = null;
-            return this;
-        }
-
-        @Override
-        public void close() {
-            currentBatch.releaseBatch();
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (returnedRow != null) {
-                return true;
-            }
-            try {
-                // if currentMergedRows is null, we need to get the next 
mergedRows
-                if (currentMergedRows == null) {
-                    SortMergeRows sortMergeRows = currentBatch.next();
-                    //  next mergedRows is not null and is not empty, set the 
currentMergedRows
-                    if (sortMergeRows != null && 
!sortMergeRows.mergedRows.isEmpty()) {
-                        currentMergedRows = 
sortMergeRows.mergedRows.iterator();
-                    }
-                }
-                // check whether has next row, if does, set the internalRow to 
returned in method
-                // next;
-                if (currentMergedRows != null && currentMergedRows.hasNext()) {
-                    returnedRow = currentMergedRows.next();
-                }
-                return returnedRow != null;
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public com.alibaba.fluss.row.InternalRow next() {
-            InternalRow returnedRow =
-                    projectedRow == null
-                            ? this.returnedRow
-                            : projectedRow.replaceRow(this.returnedRow);
-            // now, we can set the internalRow to null,
-            // if no any row remain in current merged row, set the 
currentMergedRows to null
-            // to enable fetch next merged rows
-            this.returnedRow = null;
-            if (currentMergedRows != null && !currentMergedRows.hasNext()) {
-                currentMergedRows = null;
-            }
-            return new PaimonRowWrapper(returnedRow);
-        }
-    }
-
-    private class ChangeLogIteratorWrapper
-            implements CloseableIterator<com.alibaba.fluss.row.InternalRow> {
-        private CloseableIterator<KeyValueRow> changeLogRecordIterator;
-
-        public ChangeLogIteratorWrapper() {}
-
-        public ChangeLogIteratorWrapper replace(
-                CloseableIterator<KeyValueRow> changeLogRecordIterator) {
-            this.changeLogRecordIterator = changeLogRecordIterator;
-            return this;
-        }
-
-        @Override
-        public void close() {
-            if (changeLogRecordIterator != null) {
-                changeLogRecordIterator.close();
-            }
-        }
-
-        @Override
-        public boolean hasNext() {
-            return changeLogRecordIterator != null && 
changeLogRecordIterator.hasNext();
-        }
-
-        @Override
-        public com.alibaba.fluss.row.InternalRow next() {
-            InternalRow returnedRow = 
changeLogRecordIterator.next().valueRow();
-            if (projectedRow != null) {
-                returnedRow = projectedRow.replaceRow(returnedRow);
-            }
-            return new PaimonRowWrapper(returnedRow);
-        }
-    }
-
-    private SortMergeRows sortMergeWithChangeLog(InternalRow 
paimonSnapshotRow) {
-        // no log record, we return the snapshot record
-        if (!changeLogIterator.hasNext()) {
-            return new SortMergeRows(paimonSnapshotRow);
-        }
-        KeyValueRow logKeyValueRow = changeLogIterator.next();
-        // now, let's compare with the snapshot row with log row
-        int compareResult =
-                userKeyComparator.compare(
-                        snapshotProjectedPkRow.replaceRow(paimonSnapshotRow),
-                        logKeyValueRow.keyRow());
-        if (compareResult == 0) {
-            // record of snapshot is equal to log, but the log record is 
delete,
-            // we shouldn't emit record
-            if (logKeyValueRow.isDelete()) {
-                return SortMergeRows.EMPTY;
-            } else {
-                // return the log record
-                return new SortMergeRows(logKeyValueRow.valueRow());
-            }
-        }
-        // the snapshot record is less than the log record, emit the
-        // snapshot record
-        if (compareResult < 0) {
-            // need to put back the log record to log iterator to make the log 
record
-            // can be advanced again
-            changeLogIterator = addElementToHead(logKeyValueRow, 
changeLogIterator);
-            return new SortMergeRows(paimonSnapshotRow);
-        } else {
-            // snapshot record > log record
-            // we should emit the log record firsts; and still need to 
iterator changelog to find
-            // the first change log greater than the snapshot record
-            List<InternalRow> emitRows = new ArrayList<>();
-            emitRows.add(logKeyValueRow.valueRow());
-            boolean shouldEmitSnapshotRecord = true;
-            while (changeLogIterator.hasNext()) {
-                // get the next log record
-                logKeyValueRow = changeLogIterator.next();
-                // compare with the snapshot row,
-                compareResult =
-                        userKeyComparator.compare(
-                                
snapshotProjectedPkRow.replaceRow(paimonSnapshotRow),
-                                logKeyValueRow.keyRow());
-                // if snapshot record < the log record
-                if (compareResult < 0) {
-                    // we can break the loop
-                    changeLogIterator = addElementToHead(logKeyValueRow, 
changeLogIterator);
-                    break;
-                } else if (compareResult > 0) {
-                    // snapshot record > the log record
-                    // the log record should be emitted
-                    emitRows.add(logKeyValueRow.valueRow());
-                } else {
-                    // log record == snapshot record
-                    // the log record should be emitted if is not delete, but 
the snapshot record
-                    // shouldn't be emitted
-                    if (!logKeyValueRow.isDelete()) {
-                        emitRows.add(logKeyValueRow.valueRow());
-                    }
-                    shouldEmitSnapshotRecord = false;
-                }
-            }
-            if (shouldEmitSnapshotRecord) {
-                emitRows.add(paimonSnapshotRow);
-            }
-            return new SortMergeRows(emitRows);
-        }
-    }
-
-    private static class SortMergeRows {
-        private static final SortMergeRows EMPTY = new 
SortMergeRows(Collections.emptyList());
-
-        // the rows merge with change log, one snapshot row may advance 
multiple change log
-        private final List<InternalRow> mergedRows;
-
-        public SortMergeRows(List<InternalRow> mergedRows) {
-            this.mergedRows = mergedRows;
-        }
-
-        public SortMergeRows(InternalRow internalRow) {
-            this.mergedRows = Collections.singletonList(internalRow);
-        }
-    }
-
-    private <T> CloseableIterator<T> addElementToHead(
-            T firstElement, CloseableIterator<T> originElementIterator) {
-        if (originElementIterator instanceof SingleElementHeadIterator) {
-            SingleElementHeadIterator<T> singleElementHeadIterator =
-                    (SingleElementHeadIterator<T>) originElementIterator;
-            singleElementHeadIterator.set(firstElement, 
singleElementHeadIterator.inner);
-            return singleElementHeadIterator;
-        } else {
-            return new SingleElementHeadIterator<>(firstElement, 
originElementIterator);
-        }
-    }
-
-    private static class SingleElementHeadIterator<T> implements 
CloseableIterator<T> {
-        private T singleElement;
-        private CloseableIterator<T> inner;
-        private boolean singleElementReturned;
-
-        public SingleElementHeadIterator(T element, CloseableIterator<T> 
inner) {
-            this.singleElement = element;
-            this.inner = inner;
-            this.singleElementReturned = false;
-        }
-
-        public void set(T element, CloseableIterator<T> inner) {
-            this.singleElement = element;
-            this.inner = inner;
-            this.singleElementReturned = false;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return !singleElementReturned || inner.hasNext();
-        }
-
-        @Override
-        public T next() {
-            if (singleElementReturned) {
-                return inner.next();
-            }
-            singleElementReturned = true;
-            return singleElement;
-        }
-
-        @Override
-        public void close() {
-            inner.close();
-        }
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplit.java
deleted file mode 100644
index 4fb113403..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplit.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.split;
-
-import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.metadata.TableBucket;
-
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-
-import javax.annotation.Nullable;
-
-import java.util.Optional;
-
-/** A split mixing Paimon snapshot and Fluss log. */
-public class PaimonSnapshotAndFlussLogSplit extends SourceSplitBase {
-
-    public static final byte PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND = -2;
-
-    // may be null when no snapshot data for the bucket
-    @Nullable private final FileStoreSourceSplit snapshotSplit;
-
-    /** The records to skip when reading the splits. */
-    private final long recordsToSkip;
-
-    private final long startingOffset;
-    private final long stoppingOffset;
-
-    public PaimonSnapshotAndFlussLogSplit(
-            TableBucket tableBucket,
-            @Nullable FileStoreSourceSplit snapshotSplit,
-            long startingOffset,
-            long stoppingOffset) {
-        this(tableBucket, null, snapshotSplit, startingOffset, stoppingOffset, 
0);
-    }
-
-    public PaimonSnapshotAndFlussLogSplit(
-            TableBucket tableBucket,
-            @Nullable String partitionName,
-            @Nullable FileStoreSourceSplit snapshotSplit,
-            long startingOffset,
-            long stoppingOffset) {
-        this(tableBucket, partitionName, snapshotSplit, startingOffset, 
stoppingOffset, 0);
-    }
-
-    public PaimonSnapshotAndFlussLogSplit(
-            TableBucket tableBucket,
-            @Nullable String partitionName,
-            @Nullable FileStoreSourceSplit snapshotSplit,
-            long startingOffset,
-            long stoppingOffset,
-            long recordsToSkip) {
-        super(tableBucket, partitionName);
-        this.snapshotSplit = snapshotSplit;
-        this.startingOffset = startingOffset;
-        this.stoppingOffset = stoppingOffset;
-        this.recordsToSkip = recordsToSkip;
-    }
-
-    public PaimonSnapshotAndFlussLogSplit updateWithRecordsToSkip(long 
recordsToSkip) {
-        return new PaimonSnapshotAndFlussLogSplit(
-                getTableBucket(),
-                getPartitionName(),
-                snapshotSplit,
-                startingOffset,
-                stoppingOffset,
-                recordsToSkip);
-    }
-
-    @Override
-    public boolean isLakeSplit() {
-        return true;
-    }
-
-    @Override
-    public byte splitKind() {
-        return PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
-    }
-
-    @Override
-    public String splitId() {
-        return toSplitId("paimon-hybrid-snapshot-log-", tableBucket);
-    }
-
-    @Nullable
-    public FileStoreSourceSplit getSnapshotSplit() {
-        return snapshotSplit;
-    }
-
-    public long getRecordsToSkip() {
-        return recordsToSkip;
-    }
-
-    public long getStartingOffset() {
-        return startingOffset;
-    }
-
-    public Optional<Long> getStoppingOffset() {
-        return stoppingOffset >= 0 ? Optional.of(stoppingOffset) : 
Optional.empty();
-    }
-
-    @Override
-    public String toString() {
-        return "PaimonSnapshotAndFlussLogSplit{"
-                + "tableBucket="
-                + tableBucket
-                + ", partitionName='"
-                + partitionName
-                + '\''
-                + ", snapshotSplit="
-                + snapshotSplit
-                + ", recordsToSkip="
-                + recordsToSkip
-                + ", startingOffset="
-                + startingOffset
-                + ", stoppingOffset="
-                + stoppingOffset
-                + '}';
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplitState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplitState.java
deleted file mode 100644
index 0c9ae5b87..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplitState.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.split;
-
-import com.alibaba.fluss.flink.source.split.SourceSplitState;
-
-/** State of {@link PaimonSnapshotAndFlussLogSplit}. */
-public class PaimonSnapshotAndFlussLogSplitState extends SourceSplitState {
-
-    private final PaimonSnapshotAndFlussLogSplit 
paimonSnapshotAndFlussLogSplit;
-
-    /** The records to skip while reading a snapshot. */
-    private long recordsToSkip;
-
-    public PaimonSnapshotAndFlussLogSplitState(
-            PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit) {
-        super(paimonSnapshotAndFlussLogSplit);
-        this.paimonSnapshotAndFlussLogSplit = paimonSnapshotAndFlussLogSplit;
-        this.recordsToSkip = paimonSnapshotAndFlussLogSplit.getRecordsToSkip();
-    }
-
-    public void setRecordsToSkip(long recordsToSkip) {
-        this.recordsToSkip = recordsToSkip;
-    }
-
-    @Override
-    public PaimonSnapshotAndFlussLogSplit toSourceSplit() {
-        return 
paimonSnapshotAndFlussLogSplit.updateWithRecordsToSkip(recordsToSkip);
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplit.java
deleted file mode 100644
index 23f22f416..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplit.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.split;
-
-import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.metadata.TableBucket;
-
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-
-import javax.annotation.Nullable;
-
-import java.util.Objects;
-
-/** A split for reading a snapshot of paimon. */
-public class PaimonSnapshotSplit extends SourceSplitBase {
-
-    public static final byte PAIMON_SNAPSHOT_SPLIT_KIND = -1;
-
-    private final FileStoreSourceSplit fileStoreSourceSplit;
-
-    public PaimonSnapshotSplit(
-            TableBucket tableBucket,
-            @Nullable String partitionName,
-            FileStoreSourceSplit fileStoreSourceSplit) {
-        super(tableBucket, partitionName);
-        this.fileStoreSourceSplit = fileStoreSourceSplit;
-    }
-
-    @Override
-    public String splitId() {
-        return toSplitId("paimon-snapshot-" + fileStoreSourceSplit.splitId() + 
"-", tableBucket);
-    }
-
-    @Override
-    public byte splitKind() {
-        return PAIMON_SNAPSHOT_SPLIT_KIND;
-    }
-
-    @Override
-    public boolean isLakeSplit() {
-        return true;
-    }
-
-    public FileStoreSourceSplit getFileStoreSourceSplit() {
-        return fileStoreSourceSplit;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof PaimonSnapshotSplit)) {
-            return false;
-        }
-        if (!super.equals(o)) {
-            return false;
-        }
-        PaimonSnapshotSplit that = (PaimonSnapshotSplit) o;
-        return Objects.equals(fileStoreSourceSplit, that.fileStoreSourceSplit);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(super.hashCode(), fileStoreSourceSplit);
-    }
-
-    @Override
-    public String toString() {
-        return "PaimonSnapshotSplit{"
-                + "tableBucket="
-                + tableBucket
-                + ", partitionName='"
-                + partitionName
-                + '\''
-                + ", fileStoreSourceSplit="
-                + fileStoreSourceSplit
-                + '}';
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java
deleted file mode 100644
index 4c2941c4b..000000000
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.split;
-
-import com.alibaba.fluss.flink.source.split.SourceSplitState;
-
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-
-/** The state of {@link PaimonSnapshotSplit}. */
-public class PaimonSnapshotSplitState extends SourceSplitState {
-
-    private final PaimonSnapshotSplit paimonSnapshotSplit;
-    /** The records to skip while reading a snapshot. */
-    private long recordsToSkip;
-
-    public PaimonSnapshotSplitState(PaimonSnapshotSplit paimonSnapshotSplit) {
-        super(paimonSnapshotSplit);
-        this.paimonSnapshotSplit = paimonSnapshotSplit;
-        this.recordsToSkip = 
paimonSnapshotSplit.getFileStoreSourceSplit().recordsToSkip();
-    }
-
-    public void setRecordsToSkip(long recordsToSkip) {
-        this.recordsToSkip = recordsToSkip;
-    }
-
-    @Override
-    public PaimonSnapshotSplit toSourceSplit() {
-        FileStoreSourceSplit fileStoreSourceSplit =
-                paimonSnapshotSplit
-                        .getFileStoreSourceSplit()
-                        .updateWithRecordsToSkip(recordsToSkip);
-        return new PaimonSnapshotSplit(
-                paimonSnapshotSplit.getTableBucket(),
-                paimonSnapshotSplit.getPartitionName(),
-                fileStoreSourceSplit);
-    }
-}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
index ac18b5283..9dfe4a028 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -81,7 +81,6 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkSourceSplitReader.class);
     private final RowType sourceOutputType;
-    private final TablePath tablePath;
 
     // boundedSplits, kv snapshot split or lake snapshot split
     private final Queue<SourceSplitBase> boundedSplits;
@@ -127,7 +126,6 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
         this.connection = ConnectionFactory.createConnection(flussConf, 
flinkMetricRegistry);
         this.table = connection.getTable(tablePath);
         this.sourceOutputType = sourceOutputType;
-        this.tablePath = tablePath;
         this.boundedSplits = new ArrayDeque<>();
         this.subscribedBuckets = new HashMap<>();
         this.projectedFields = projectedFields;
@@ -222,8 +220,7 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
     private LakeSplitReaderGenerator getLakeSplitReader() {
         if (lakeSplitReaderGenerator == null) {
             lakeSplitReaderGenerator =
-                    new LakeSplitReaderGenerator(
-                            table, tablePath, projectedFields, 
checkNotNull(lakeSource));
+                    new LakeSplitReaderGenerator(table, projectedFields, 
checkNotNull(lakeSource));
         }
         return lakeSplitReaderGenerator;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
index b190852fb..2796f16e8 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -28,7 +28,6 @@ import com.alibaba.fluss.metadata.TableBucket;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -55,7 +54,7 @@ class LakeSplitSerializerTest {
     private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer =
             new TestSimpleVersionedSerializer();
 
-    @Mock private TableBucket tableBucket = new TableBucket(0, 1L, 0);
+    private TableBucket tableBucket = new TableBucket(0, 1L, 0);
 
     private final LakeSplitSerializer serializer = new 
LakeSplitSerializer(sourceSplitSerializer);
 
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index aa565fbaa..0b1e3ca0d 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -318,7 +318,6 @@
                                         <!-- start exclude for metric -->
                                         
<exclude>com.alibaba.fluss.metrics.*</exclude>
                                         <!-- end exclude for metric -->
-                                        
<exclude>com.alibaba.fluss.flink.lakehouse.*</exclude>
                                         
<exclude>com.alibaba.fluss.flink.lake.*</exclude>
                                         
<exclude>com.alibaba.fluss.kafka.*</exclude>
                                         <!-- exclude for fluss-ci-tools -->


Reply via email to