This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new eb3b5f8ab [flink] Union read decouple with paimon for pk table (#1543)
eb3b5f8ab is described below
commit eb3b5f8abf80a39b1465a0afb2c7d821e658f03a
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Aug 18 18:27:38 2025 +0800
[flink] Union read decouple with paimon for pk table (#1543)
---
.../fluss/flink/lake/LakeRecordRecordEmitter.java | 5 +
.../fluss/flink/lake/LakeSplitGenerator.java | 119 +-----
.../fluss/flink/lake/LakeSplitReaderGenerator.java | 15 +-
.../fluss/flink/lake/LakeSplitSerializer.java | 63 ++--
.../flink/lake/LakeSplitStateInitializer.java | 5 +-
.../fluss/flink/lake/reader/KeyValueRow.java | 48 +++
.../reader/LakeSnapshotAndLogSplitScanner.java | 252 +++++++++++++
.../fluss/flink/lake/reader/SortMergeReader.java | 417 +++++++++++++++++++++
.../lake/split/LakeSnapshotAndFlussLogSplit.java | 110 ++++++
.../state/LakeSnapshotAndFlussLogSplitState.java | 45 +++
.../fluss/flink/lake/LakeSplitSerializerTest.java | 176 +++++++++
.../flink/lake/reader/SortMergeReaderTest.java | 161 ++++++++
12 files changed, 1276 insertions(+), 140 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
index 207422ed8..9e6848864 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -18,6 +18,7 @@
package com.alibaba.fluss.flink.lake;
import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
@@ -52,6 +53,10 @@ public class LakeRecordRecordEmitter<OUT> {
} else if (splitState instanceof LakeSnapshotSplitState) {
((LakeSnapshotSplitState)
splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
+ } else if (splitState instanceof LakeSnapshotAndFlussLogSplitState) {
+ ((LakeSnapshotAndFlussLogSplitState) splitState)
+ .setRecordsToSkip(recordAndPos.readRecordsCount());
+ sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
} else {
throw new UnsupportedOperationException(
"Unknown split state type: " + splitState.getClass());
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
index 8c148ccf5..1d8198a1f 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
@@ -19,8 +19,8 @@ package com.alibaba.fluss.flink.lake;
import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.client.metadata.LakeSnapshot;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
import
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
import com.alibaba.fluss.flink.source.split.LogSplit;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
@@ -30,17 +30,6 @@ import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.InnerTableScan;
-
import javax.annotation.Nullable;
import java.util.ArrayList;
@@ -53,8 +42,6 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
-import static
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
-import static com.alibaba.fluss.utils.Preconditions.checkState;
/** A generator for lake splits. */
public class LakeSplitGenerator {
@@ -86,10 +73,6 @@ public class LakeSplitGenerator {
// get the file store
LakeSnapshot lakeSnapshotInfo =
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
- FileStoreTable fileStoreTable =
- getTable(
- lakeSnapshotInfo.getSnapshotId(),
-
extractLakeCatalogProperties(tableInfo.getProperties()));
boolean isLogTable = !tableInfo.hasPrimaryKey();
boolean isPartitioned = tableInfo.isPartitioned();
@@ -113,17 +96,13 @@ public class LakeSplitGenerator {
lakeSplits,
isLogTable,
lakeSnapshotInfo.getTableBucketsOffset(),
- partitionNameById,
- fileStoreTable);
+ partitionNameById);
} else {
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
lakeSplits.values().iterator().next();
// non-partitioned table
return generateNoPartitionedTableSplit(
- nonPartitionLakeSplits,
- isLogTable,
- lakeSnapshotInfo.getTableBucketsOffset(),
- fileStoreTable);
+ nonPartitionLakeSplits, isLogTable,
lakeSnapshotInfo.getTableBucketsOffset());
}
}
@@ -145,8 +124,7 @@ public class LakeSplitGenerator {
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
boolean isLogTable,
Map<TableBucket, Long> tableBucketSnapshotLogOffset,
- Map<Long, String> partitionNameById,
- @Nullable FileStoreTable fileStoreTable)
+ Map<Long, String> partitionNameById)
throws Exception {
List<SourceSplitBase> splits = new ArrayList<>();
Map<String, Long> flussPartitionIdByName =
@@ -181,8 +159,7 @@ public class LakeSplitGenerator {
partitionName,
isLogTable,
tableBucketSnapshotLogOffset,
- bucketEndOffset,
- fileStoreTable));
+ bucketEndOffset));
} else {
// only lake data
@@ -216,8 +193,7 @@ public class LakeSplitGenerator {
isLogTable,
// pass empty map since we won't read lake splits
Collections.emptyMap(),
- bucketEndOffset,
- fileStoreTable));
+ bucketEndOffset));
}
return splits;
}
@@ -228,8 +204,7 @@ public class LakeSplitGenerator {
@Nullable String partitionName,
boolean isLogTable,
Map<TableBucket, Long> tableBucketSnapshotLogOffset,
- Map<Integer, Long> bucketEndOffset,
- @Nullable FileStoreTable fileStoreTable) {
+ Map<Integer, Long> bucketEndOffset) {
List<SourceSplitBase> splits = new ArrayList<>();
if (isLogTable) {
if (lakeSplits != null) {
@@ -264,12 +239,9 @@ public class LakeSplitGenerator {
new TableBucket(tableInfo.getTableId(), partitionId,
bucket);
Long snapshotLogOffset =
tableBucketSnapshotLogOffset.get(tableBucket);
long stoppingOffset = bucketEndOffset.get(bucket);
- FileStoreSourceSplitGenerator splitGenerator = new
FileStoreSourceSplitGenerator();
-
splits.add(
generateSplitForPrimaryKeyTableBucket(
- fileStoreTable,
- splitGenerator,
+ lakeSplits != null ? lakeSplits.get(bucket) :
null,
tableBucket,
partitionName,
snapshotLogOffset,
@@ -295,83 +267,26 @@ public class LakeSplitGenerator {
}
private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
- FileStoreTable fileStoreTable,
- FileStoreSourceSplitGenerator splitGenerator,
+ @Nullable List<LakeSplit> lakeSplits,
TableBucket tableBucket,
@Nullable String partitionName,
@Nullable Long snapshotLogOffset,
long stoppingOffset) {
-
// no snapshot data for this bucket or no a corresponding log offset
in this bucket,
// can only scan from change log
if (snapshotLogOffset == null || snapshotLogOffset < 0) {
- return new PaimonSnapshotAndFlussLogSplit(
+ return new LakeSnapshotAndFlussLogSplit(
tableBucket, partitionName, null, EARLIEST_OFFSET,
stoppingOffset);
}
- // then, generate a split contains
- // snapshot and change log so that we can merge change log and snapshot
- // to get the full data
- fileStoreTable =
- fileStoreTable.copy(
- Collections.singletonMap(
- CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(),
- // we set a max size to make sure only one
splits
- MemorySize.MAX_VALUE.toString()));
- InnerTableScan tableScan =
- fileStoreTable.newScan().withBucketFilter((b) -> b ==
tableBucket.getBucket());
-
- if (partitionName != null) {
- tableScan =
-
tableScan.withPartitionFilter(getPartitionSpec(fileStoreTable, partitionName));
- }
-
- List<FileStoreSourceSplit> fileStoreSourceSplits =
- splitGenerator.createSplits(tableScan.plan());
-
- checkState(fileStoreSourceSplits.size() == 1, "Splits for primary key
table must be 1.");
- FileStoreSourceSplit fileStoreSourceSplit =
fileStoreSourceSplits.get(0);
- return new PaimonSnapshotAndFlussLogSplit(
- tableBucket,
- partitionName,
- fileStoreSourceSplit,
- snapshotLogOffset,
- stoppingOffset);
- }
-
- private Map<String, String> getPartitionSpec(
- FileStoreTable fileStoreTable, String partitionName) {
- List<String> partitionKeys = fileStoreTable.partitionKeys();
- checkState(
- partitionKeys.size() == 1,
- "Must only one partition key for paimon table %, but got %s,
the partition keys are: ",
- tableInfo.getTablePath(),
- partitionKeys.size(),
- partitionKeys);
- return Collections.singletonMap(partitionKeys.get(0), partitionName);
- }
-
- private FileStoreTable getTable(long snapshotId, Map<String, String>
catalogProperties)
- throws Exception {
- try (Catalog catalog =
-
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(catalogProperties))) {
- return (FileStoreTable)
- catalog.getTable(
- Identifier.create(
-
tableInfo.getTablePath().getDatabaseName(),
-
tableInfo.getTablePath().getTableName()))
- .copy(
- Collections.singletonMap(
- CoreOptions.SCAN_SNAPSHOT_ID.key(),
- String.valueOf(snapshotId)));
- }
+ return new LakeSnapshotAndFlussLogSplit(
+ tableBucket, partitionName, lakeSplits, snapshotLogOffset,
stoppingOffset);
}
private List<SourceSplitBase> generateNoPartitionedTableSplit(
Map<Integer, List<LakeSplit>> lakeSplits,
boolean isLogTable,
- Map<TableBucket, Long> tableBucketSnapshotLogOffset,
- FileStoreTable fileStoreTable) {
+ Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
// iterate all bucket
// assume bucket is from 0 to bucket count
Map<Integer, Long> bucketEndOffset =
@@ -380,12 +295,6 @@ public class LakeSplitGenerator {
IntStream.range(0,
bucketCount).boxed().collect(Collectors.toList()),
bucketOffsetsRetriever);
return generateSplit(
- lakeSplits,
- null,
- null,
- isLogTable,
- tableBucketSnapshotLogOffset,
- bucketEndOffset,
- fileStoreTable);
+ lakeSplits, null, null, isLogTable,
tableBucketSnapshotLogOffset, bucketEndOffset);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
index 02e5e5e5b..410bef73d 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -18,7 +18,9 @@
package com.alibaba.fluss.flink.lake;
import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner;
import com.alibaba.fluss.flink.lake.reader.LakeSnapshotScanner;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
import
com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
@@ -73,7 +75,8 @@ public class LakeSplitReaderGenerator {
boundedSplits.add(split);
} else if (split instanceof LakeSnapshotSplit) {
boundedSplits.add(split);
- // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
+ } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
+ boundedSplits.add(split);
} else {
throw new UnsupportedOperationException(
String.format("The split type of %s is not supported.",
split.getClass()));
@@ -112,7 +115,15 @@ public class LakeSplitReaderGenerator {
new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
return new BoundedSplitReader(
lakeSnapshotScanner,
lakeSnapshotSplit.getRecordsToSplit());
- // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
+ } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
+ LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+ (LakeSnapshotAndFlussLogSplit) split;
+ LakeSnapshotAndLogSplitScanner lakeSnapshotAndLogSplitScanner =
+ new LakeSnapshotAndLogSplitScanner(
+ table, lakeSource, lakeSnapshotAndFlussLogSplit,
projectedFields);
+ return new BoundedSplitReader(
+ lakeSnapshotAndLogSplitScanner,
+ lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
} else {
throw new UnsupportedOperationException(
String.format("The split type of %s is not supported.",
split.getClass()));
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
index 19b6f29c7..0bfb897a6 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
@@ -17,8 +17,8 @@
package com.alibaba.fluss.flink.lake;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
import com.alibaba.fluss.flink.source.split.LogSplit;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
@@ -27,15 +27,15 @@ import com.alibaba.fluss.metadata.TableBucket;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.flink.source.FileStoreSourceSplitSerializer;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import static
com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
import static
com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit.LAKE_SNAPSHOT_SPLIT_KIND;
-import static
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit.PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
/** A serializer for lake split. */
public class LakeSplitSerializer {
@@ -52,32 +52,30 @@ public class LakeSplitSerializer {
sourceSplitSerializer.serialize(((LakeSnapshotSplit)
split).getLakeSplit());
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
- } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
- // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
- FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
- new FileStoreSourceSplitSerializer();
+ } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
// writing file store source split
- PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
- ((PaimonSnapshotAndFlussLogSplit) split);
- FileStoreSourceSplit fileStoreSourceSplit =
- paimonSnapshotAndFlussLogSplit.getSnapshotSplit();
- if (fileStoreSourceSplit == null) {
+ LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+ ((LakeSnapshotAndFlussLogSplit) split);
+ List<LakeSplit> lakeSplits =
lakeSnapshotAndFlussLogSplit.getLakeSplits();
+ if (lakeSplits == null) {
// no snapshot data for the bucket
out.writeBoolean(false);
} else {
out.writeBoolean(true);
- byte[] serializeBytes =
-
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
- out.writeInt(serializeBytes.length);
- out.write(serializeBytes);
+ out.writeInt(lakeSplits.size());
+ for (LakeSplit lakeSplit : lakeSplits) {
+ byte[] serializeBytes =
sourceSplitSerializer.serialize(lakeSplit);
+ out.writeInt(serializeBytes.length);
+ out.write(serializeBytes);
+ }
}
// writing starting/stopping offset
- out.writeLong(paimonSnapshotAndFlussLogSplit.getStartingOffset());
+ out.writeLong(lakeSnapshotAndFlussLogSplit.getStartingOffset());
out.writeLong(
- paimonSnapshotAndFlussLogSplit
+ lakeSnapshotAndFlussLogSplit
.getStoppingOffset()
.orElse(LogSplit.NO_STOPPING_OFFSET));
- out.writeLong(paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
+ out.writeLong(lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
} else {
throw new UnsupportedOperationException(
"Unsupported split type: " + split.getClass().getName());
@@ -97,25 +95,26 @@ public class LakeSplitSerializer {
sourceSplitSerializer.deserialize(
sourceSplitSerializer.getVersion(),
serializeBytes);
return new LakeSnapshotSplit(tableBucket, partition,
fileStoreSourceSplit);
- // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
- } else if (splitKind == PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
- FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
- new FileStoreSourceSplitSerializer();
- FileStoreSourceSplit fileStoreSourceSplit = null;
+ } else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
+ List<LakeSplit> lakeSplits = null;
if (input.readBoolean()) {
- byte[] serializeBytes = new byte[input.readInt()];
- input.read(serializeBytes);
- fileStoreSourceSplit =
- fileStoreSourceSplitSerializer.deserialize(
- fileStoreSourceSplitSerializer.getVersion(),
serializeBytes);
+ int lakeSplitSize = input.readInt();
+ lakeSplits = new ArrayList<>(lakeSplitSize);
+ for (int i = 0; i < lakeSplitSize; i++) {
+ byte[] serializeBytes = new byte[input.readInt()];
+ input.read(serializeBytes);
+ lakeSplits.add(
+ sourceSplitSerializer.deserialize(
+ sourceSplitSerializer.getVersion(),
serializeBytes));
+ }
}
long startingOffset = input.readLong();
long stoppingOffset = input.readLong();
long recordsToSkip = input.readLong();
- return new PaimonSnapshotAndFlussLogSplit(
+ return new LakeSnapshotAndFlussLogSplit(
tableBucket,
partition,
- fileStoreSourceSplit,
+ lakeSplits,
startingOffset,
stoppingOffset,
recordsToSkip);
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
index aae8987ad..a8779bce0 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
@@ -17,7 +17,9 @@
package com.alibaba.fluss.flink.lake;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
@@ -32,7 +34,8 @@ public class LakeSplitStateInitializer {
return new
PaimonSnapshotAndFlussLogSplitState((PaimonSnapshotAndFlussLogSplit) split);
} else if (split instanceof LakeSnapshotSplit) {
return new LakeSnapshotSplitState((LakeSnapshotSplit) split);
- // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
+ } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
+ return new
LakeSnapshotAndFlussLogSplitState((LakeSnapshotAndFlussLogSplit) split);
} else {
throw new UnsupportedOperationException("Unsupported split type: "
+ split);
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/KeyValueRow.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/KeyValueRow.java
new file mode 100644
index 000000000..abb84a687
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/KeyValueRow.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.ProjectedRow;
+
+/** An {@link InternalRow} with the key part. */
+public class KeyValueRow {
+
+ private final boolean isDelete;
+ private final ProjectedRow pkRow;
+ private final InternalRow valueRow;
+
+ public KeyValueRow(int[] keyIndexes, InternalRow valueRow, boolean
isDelete) {
+ this.pkRow = ProjectedRow.from(keyIndexes).replaceRow(valueRow);
+ this.isDelete = isDelete;
+ this.valueRow = valueRow;
+ }
+
+ public boolean isDelete() {
+ return isDelete;
+ }
+
+ public InternalRow keyRow() {
+ return pkRow;
+ }
+
+ public InternalRow valueRow() {
+ return valueRow;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
new file mode 100644
index 000000000..e3c835ee0
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotAndLogSplitScanner.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
+import com.alibaba.fluss.client.table.scanner.log.LogScanner;
+import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.lake.source.RecordReader;
+import com.alibaba.fluss.lake.source.SortedRecordReader;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** A scanner to merge the lakehouse's snapshot and change log. */
+public class LakeSnapshotAndLogSplitScanner implements BatchScanner {
+
+ private final LakeSnapshotAndFlussLogSplit
lakeSnapshotSplitAndFlussLogSplit;
+ private Comparator<InternalRow> rowComparator;
+ private List<CloseableIterator<LogRecord>> lakeRecordIterators = new
ArrayList<>();
+ private final LakeSource<LakeSplit> lakeSource;
+
+ private final int[] pkIndexes;
+
+ // the indexes of primary key in emitted row by paimon and fluss
+ private int[] keyIndexesInRow;
+ @Nullable private int[] adjustProjectedFields;
+ private final int[] newProjectedFields;
+
+ // the sorted logs in memory, mapping from key -> value
+ private Map<InternalRow, KeyValueRow> logRows;
+
+ private final LogScanner logScanner;
+ private final long stoppingOffset;
+ private boolean logScanFinished;
+
+ private SortMergeReader currentSortMergeReader;
+
+ public LakeSnapshotAndLogSplitScanner(
+ Table table,
+ LakeSource<LakeSplit> lakeSource,
+ LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit,
+ @Nullable int[] projectedFields) {
+ this.pkIndexes =
table.getTableInfo().getSchema().getPrimaryKeyIndexes();
+ this.lakeSnapshotSplitAndFlussLogSplit = lakeSnapshotAndFlussLogSplit;
+ this.lakeSource = lakeSource;
+ this.newProjectedFields = getNeedProjectFields(table, projectedFields);
+
+ this.logScanner =
table.newScan().project(newProjectedFields).createLogScanner();
+ this.lakeSource.withProject(
+ Arrays.stream(newProjectedFields)
+ .mapToObj(field -> new int[] {field})
+ .toArray(int[][]::new));
+
+ TableBucket tableBucket =
lakeSnapshotAndFlussLogSplit.getTableBucket();
+ if (tableBucket.getPartitionId() != null) {
+ this.logScanner.subscribe(
+ tableBucket.getPartitionId(),
+ tableBucket.getBucket(),
+ lakeSnapshotAndFlussLogSplit.getStartingOffset());
+ } else {
+ this.logScanner.subscribe(
+ tableBucket.getBucket(),
lakeSnapshotAndFlussLogSplit.getStartingOffset());
+ }
+
+ this.stoppingOffset =
+ lakeSnapshotAndFlussLogSplit
+ .getStoppingOffset()
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "StoppingOffset is null for
split: "
+ +
lakeSnapshotAndFlussLogSplit));
+
+ this.logScanFinished =
lakeSnapshotAndFlussLogSplit.getStartingOffset() >= stoppingOffset;
+ }
+
+ private int[] getNeedProjectFields(Table flussTable, @Nullable int[]
projectedFields) {
+ if (projectedFields != null) {
+ // we need to include the primary key in projected fields to sort
merge by pk
+ // if the provided don't include, we need to include it
+ List<Integer> newProjectedFields =
+
Arrays.stream(projectedFields).boxed().collect(Collectors.toList());
+
+ // the indexes of primary key with new projected fields
+ keyIndexesInRow = new int[pkIndexes.length];
+ for (int i = 0; i < pkIndexes.length; i++) {
+ int primaryKeyIndex = pkIndexes[i];
+ // search the pk in projected fields
+ int indexInProjectedFields = findIndex(projectedFields,
primaryKeyIndex);
+ if (indexInProjectedFields >= 0) {
+ keyIndexesInRow[i] = indexInProjectedFields;
+ } else {
+ // no pk in projected fields, we must include it to do
+ // merge sort
+ newProjectedFields.add(primaryKeyIndex);
+ keyIndexesInRow[i] = newProjectedFields.size() - 1;
+ }
+ }
+ int[] newProjection =
newProjectedFields.stream().mapToInt(Integer::intValue).toArray();
+ // the underlying scan will use the new projection to scan data,
+ // but will still need to map from the new projection to the
origin projected fields
+ int[] adjustProjectedFields = new int[projectedFields.length];
+ for (int i = 0; i < projectedFields.length; i++) {
+ adjustProjectedFields[i] = findIndex(newProjection,
projectedFields[i]);
+ }
+ this.adjustProjectedFields = adjustProjectedFields;
+ return newProjection;
+ } else {
+ // no projectedFields, use all fields
+ keyIndexesInRow = pkIndexes;
+ return IntStream.range(0,
flussTable.getTableInfo().getRowType().getFieldCount())
+ .toArray();
+ }
+ }
+
+ private int findIndex(int[] array, int target) {
+ int index = -1;
+ for (int i = 0; i < array.length; i++) {
+ if (array[i] == target) {
+ index = i;
+ break;
+ }
+ }
+ return index;
+ }
+
+ @Nullable
+ @Override
+ public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws
IOException {
+ if (logScanFinished) {
+ if (lakeRecordIterators.isEmpty()) {
+ if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
+ ||
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
+ lakeRecordIterators = Collections.emptyList();
+ } else {
+ for (LakeSplit lakeSplit :
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
+ lakeRecordIterators.add(
+ lakeSource.createRecordReader(() ->
lakeSplit).read());
+ }
+ }
+ }
+ if (currentSortMergeReader == null) {
+ currentSortMergeReader =
+ new SortMergeReader(
+ adjustProjectedFields,
+ keyIndexesInRow,
+ lakeRecordIterators,
+ rowComparator,
+ CloseableIterator.wrap(
+ logRows == null
+ ? Collections.emptyIterator()
+ :
logRows.values().iterator()));
+ }
+ return currentSortMergeReader.readBatch();
+ } else {
+ if (lakeRecordIterators.isEmpty()) {
+ if (lakeSnapshotSplitAndFlussLogSplit.getLakeSplits() == null
+ ||
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits().isEmpty()) {
+ lakeRecordIterators = Collections.emptyList();
+ logRows = new LinkedHashMap<>();
+ } else {
+ for (LakeSplit lakeSplit :
lakeSnapshotSplitAndFlussLogSplit.getLakeSplits()) {
+ RecordReader reader = lakeSource.createRecordReader(()
-> lakeSplit);
+ if (reader instanceof SortedRecordReader) {
+ rowComparator = ((SortedRecordReader)
reader).order();
+ } else {
+ throw new UnsupportedOperationException(
+ "lake records must instance of sorted
view.");
+ }
+ lakeRecordIterators.add(reader.read());
+ }
+ logRows = new TreeMap<>(rowComparator);
+ }
+ }
+ pollLogRecords(timeout);
+ return CloseableIterator.wrap(Collections.emptyIterator());
+ }
+ }
+
+ private void pollLogRecords(Duration timeout) {
+ ScanRecords scanRecords = logScanner.poll(timeout);
+ for (ScanRecord scanRecord : scanRecords) {
+ boolean isDelete =
+ scanRecord.getChangeType() == ChangeType.DELETE
+ || scanRecord.getChangeType() ==
ChangeType.UPDATE_BEFORE;
+ KeyValueRow keyValueRow =
+ new KeyValueRow(keyIndexesInRow, scanRecord.getRow(),
isDelete);
+ InternalRow keyRow = keyValueRow.keyRow();
+ // upsert the key value row
+ logRows.put(keyRow, keyValueRow);
+ if (scanRecord.logOffset() >= stoppingOffset - 1) {
+ // has reached to the end
+ logScanFinished = true;
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ if (logScanner != null) {
+ logScanner.close();
+ }
+ if (lakeRecordIterators != null) {
+ for (CloseableIterator<LogRecord> iterator :
lakeRecordIterators) {
+ iterator.close();
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to close resources", e);
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java
new file mode 100644
index 000000000..f1a52b623
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/SortMergeReader.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.PriorityQueue;
+import java.util.function.Function;
+
+/** A sort merge reader to merge lakehouse snapshot record and fluss change
log. */
+class SortMergeReader {
+
+ private final ProjectedRow snapshotProjectedPkRow;
+ private final CloseableIterator<LogRecord> lakeRecordIterator;
+ private final Comparator<InternalRow> userKeyComparator;
+ private CloseableIterator<KeyValueRow> changeLogIterator;
+
+ private final SnapshotMergedRowIteratorWrapper
snapshotMergedRowIteratorWrapper;
+
+ private final ChangeLogIteratorWrapper changeLogIteratorWrapper;
+ private @Nullable final ProjectedRow projectedRow;
+
+ public SortMergeReader(
+ @Nullable int[] projectedFields,
+ int[] pkIndexes,
+ List<CloseableIterator<LogRecord>> lakeRecordIterators,
+ Comparator<InternalRow> userKeyComparator,
+ CloseableIterator<KeyValueRow> changeLogIterator) {
+ this.userKeyComparator = userKeyComparator;
+ this.snapshotProjectedPkRow = ProjectedRow.from(pkIndexes);
+ this.lakeRecordIterator =
+ ConcatRecordIterator.wrap(lakeRecordIterators,
userKeyComparator, pkIndexes);
+ this.changeLogIterator = changeLogIterator;
+ this.changeLogIteratorWrapper = new ChangeLogIteratorWrapper();
+ this.snapshotMergedRowIteratorWrapper = new
SnapshotMergedRowIteratorWrapper();
+ // to project to fields provided by user
+ this.projectedRow = projectedFields == null ? null :
ProjectedRow.from(projectedFields);
+ }
+
+ @Nullable
+ public CloseableIterator<InternalRow> readBatch() {
+ if (!lakeRecordIterator.hasNext()) {
+ return changeLogIterator.hasNext()
+ ? changeLogIteratorWrapper.replace(changeLogIterator)
+ : null;
+ } else {
+ CloseableIterator<SortMergeRows> mergedRecordIterator =
+ transform(lakeRecordIterator,
this::sortMergeWithChangeLog);
+
+ return
snapshotMergedRowIteratorWrapper.replace(mergedRecordIterator);
+ }
+ }
+
+ /** A concat record iterator to concat multiple record iterator. */
+ private static class ConcatRecordIterator implements
CloseableIterator<LogRecord> {
+ private final PriorityQueue<SingleElementHeadIterator<LogRecord>>
priorityQueue;
+ private final ProjectedRow snapshotProjectedPkRow1;
+ private final ProjectedRow snapshotProjectedPkRow2;
+
+ public ConcatRecordIterator(
+ List<CloseableIterator<LogRecord>> iteratorList,
+ int[] pkIndexes,
+ Comparator<InternalRow> comparator) {
+ this.snapshotProjectedPkRow1 = ProjectedRow.from(pkIndexes);
+ this.snapshotProjectedPkRow2 = ProjectedRow.from(pkIndexes);
+ this.priorityQueue =
+ new PriorityQueue<>(
+ Math.max(1, iteratorList.size()),
+ (s1, s2) ->
+ comparator.compare(
+ getComparableRow(s1,
snapshotProjectedPkRow1),
+ getComparableRow(s2,
snapshotProjectedPkRow2)));
+ iteratorList.stream()
+ .filter(Iterator::hasNext)
+ .map(
+ iterator ->
+ SingleElementHeadIterator.addElementToHead(
+ iterator.next(), iterator))
+ .forEach(priorityQueue::add);
+ }
+
+ public static CloseableIterator<LogRecord> wrap(
+ List<CloseableIterator<LogRecord>> iteratorList,
+ Comparator<InternalRow> comparator,
+ int[] pkIndexes) {
+ if (iteratorList.isEmpty()) {
+ return CloseableIterator.wrap(Collections.emptyIterator());
+ }
+ return new ConcatRecordIterator(iteratorList, pkIndexes,
comparator);
+ }
+
+ private InternalRow getComparableRow(
+ SingleElementHeadIterator<LogRecord> iterator, ProjectedRow
projectedRow) {
+ return projectedRow.replaceRow(iterator.peek().getRow());
+ }
+
+ @Override
+ public void close() {
+ while (!priorityQueue.isEmpty()) {
+ priorityQueue.poll().close();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (!priorityQueue.isEmpty()) {
+ CloseableIterator<LogRecord> iterator = priorityQueue.peek();
+ if (iterator.hasNext()) {
+ return true;
+ }
+ priorityQueue.poll().close();
+ }
+ return false;
+ }
+
+ @Override
+ public LogRecord next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return priorityQueue.peek().next();
+ }
+ }
+
+ private SortMergeRows sortMergeWithChangeLog(InternalRow lakeSnapshotRow) {
+ // no log record, we return the snapshot record
+ if (!changeLogIterator.hasNext()) {
+ return new SortMergeRows(lakeSnapshotRow);
+ }
+ KeyValueRow logKeyValueRow = changeLogIterator.next();
+ // now, let's compare with the snapshot row with log row
+ int compareResult =
+ userKeyComparator.compare(
+ snapshotProjectedPkRow.replaceRow(lakeSnapshotRow),
+ logKeyValueRow.keyRow());
+ if (compareResult == 0) {
+ // record of snapshot is equal to log, but the log record is
delete,
+ // we shouldn't emit record
+ if (logKeyValueRow.isDelete()) {
+ return SortMergeRows.EMPTY;
+ } else {
+ // return the log record
+ return new SortMergeRows(logKeyValueRow.valueRow());
+ }
+ }
+
+ // the snapshot record is less than the log record, emit the
+ // snapshot record
+ if (compareResult < 0) {
+ // need to put back the log record to log iterator to make the log
record
+ // can be advanced again
+ changeLogIterator =
+ SingleElementHeadIterator.addElementToHead(logKeyValueRow,
changeLogIterator);
+ return new SortMergeRows(lakeSnapshotRow);
+ } else {
+ // snapshot record > log record
+ // we should emit the log record firsts; and still need to
iterator changelog to find
+ // the first change log greater than the snapshot record
+ List<InternalRow> emitRows = new ArrayList<>();
+ emitRows.add(logKeyValueRow.valueRow());
+ boolean shouldEmitSnapshotRecord = true;
+ while (changeLogIterator.hasNext()) {
+ // get the next log record
+ logKeyValueRow = changeLogIterator.next();
+ // compare with the snapshot row,
+ compareResult =
+ userKeyComparator.compare(
+
snapshotProjectedPkRow.replaceRow(lakeSnapshotRow),
+ logKeyValueRow.keyRow());
+ // if snapshot record < the log record
+ if (compareResult < 0) {
+ // we can break the loop
+ changeLogIterator =
+ SingleElementHeadIterator.addElementToHead(
+ logKeyValueRow, changeLogIterator);
+ break;
+ } else if (compareResult > 0) {
+ // snapshot record > the log record
+ // the log record should be emitted
+ emitRows.add(logKeyValueRow.valueRow());
+ } else {
+ // log record == snapshot record
+ // the log record should be emitted if is not delete, but
the snapshot record
+ // shouldn't be emitted
+ if (!logKeyValueRow.isDelete()) {
+ emitRows.add(logKeyValueRow.valueRow());
+ }
+ shouldEmitSnapshotRecord = false;
+ }
+ }
+
+ if (shouldEmitSnapshotRecord) {
+ emitRows.add(lakeSnapshotRow);
+ }
+ return new SortMergeRows(emitRows);
+ }
+ }
+
+ private static class SingleElementHeadIterator<T> implements
CloseableIterator<T> {
+ private T singleElement;
+ private CloseableIterator<T> inner;
+ private boolean singleElementReturned;
+
+ public SingleElementHeadIterator(T element, CloseableIterator<T>
inner) {
+ this.singleElement = element;
+ this.inner = inner;
+ this.singleElementReturned = false;
+ }
+
+ public static <T> SingleElementHeadIterator<T> addElementToHead(
+ T firstElement, CloseableIterator<T> originElementIterator) {
+ if (originElementIterator instanceof SingleElementHeadIterator) {
+ SingleElementHeadIterator<T> singleElementHeadIterator =
+ (SingleElementHeadIterator<T>) originElementIterator;
+ singleElementHeadIterator.set(firstElement,
singleElementHeadIterator.inner);
+ return singleElementHeadIterator;
+ } else {
+ return new SingleElementHeadIterator<>(firstElement,
originElementIterator);
+ }
+ }
+
+ public void set(T element, CloseableIterator<T> inner) {
+ this.singleElement = element;
+ this.inner = inner;
+ this.singleElementReturned = false;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !singleElementReturned || inner.hasNext();
+ }
+
+ @Override
+ public T next() {
+ if (singleElementReturned) {
+ return inner.next();
+ }
+ singleElementReturned = true;
+ return singleElement;
+ }
+
+ public T peek() {
+ if (singleElementReturned) {
+ this.singleElement = inner.next();
+ this.singleElementReturned = false;
+ return this.singleElement;
+ }
+ return singleElement;
+ }
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+ }
+
+ private static class ChangeLogIteratorWrapper implements
CloseableIterator<InternalRow> {
+ private CloseableIterator<KeyValueRow> changeLogRecordIterator;
+
+ public ChangeLogIteratorWrapper() {}
+
+ public ChangeLogIteratorWrapper replace(
+ CloseableIterator<KeyValueRow> changeLogRecordIterator) {
+ this.changeLogRecordIterator = changeLogRecordIterator;
+ return this;
+ }
+
+ @Override
+ public void close() {
+ if (changeLogRecordIterator != null) {
+ changeLogRecordIterator.close();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return changeLogRecordIterator != null &&
changeLogRecordIterator.hasNext();
+ }
+
+ @Override
+ public InternalRow next() {
+ return changeLogRecordIterator.next().valueRow();
+ }
+ }
+
+ private class SnapshotMergedRowIteratorWrapper implements
CloseableIterator<InternalRow> {
+ private CloseableIterator<SortMergeRows> currentLakeSnapshotRecords;
+
+ private @Nullable Iterator<InternalRow> currentMergedRows;
+
+ // the row to be returned
+ private @Nullable InternalRow returnedRow;
+
+ public SnapshotMergedRowIteratorWrapper replace(
+ CloseableIterator<SortMergeRows> currentLakeSnapshotRecords) {
+ this.currentLakeSnapshotRecords = currentLakeSnapshotRecords;
+ this.returnedRow = null;
+ this.currentMergedRows = null;
+ return this;
+ }
+
+ @Override
+ public void close() {
+ currentLakeSnapshotRecords.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (returnedRow != null) {
+ return true;
+ }
+ try {
+ // if currentMergedRows is null, we need to get the next
mergedRows
+ if (currentMergedRows == null) {
+ SortMergeRows sortMergeRows =
+ currentLakeSnapshotRecords.hasNext()
+ ? currentLakeSnapshotRecords.next()
+ : null;
+ // next mergedRows is not null and is not empty, set the
currentMergedRows
+ if (sortMergeRows != null &&
!sortMergeRows.mergedRows.isEmpty()) {
+ currentMergedRows =
sortMergeRows.mergedRows.iterator();
+ }
+ }
+ // check if has next row, whether does, set the internalRow to
returned in method
+ // next;
+ if (currentMergedRows != null && currentMergedRows.hasNext()) {
+ returnedRow = currentMergedRows.next();
+ }
+ return returnedRow != null;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public InternalRow next() {
+ InternalRow returnedRow =
+ projectedRow == null
+ ? this.returnedRow
+ : projectedRow.replaceRow(this.returnedRow);
+ // now, we can set the internalRow to null,
+ // if no any row remain in current merged row, set the
currentMergedRows to null
+ // to enable fetch next merged rows
+ this.returnedRow = null;
+ if (currentMergedRows != null && !currentMergedRows.hasNext()) {
+ currentMergedRows = null;
+ }
+ return returnedRow;
+ }
+ }
+
+ private static class SortMergeRows {
+ private static final SortMergeRows EMPTY = new
SortMergeRows(Collections.emptyList());
+
+ // the rows merge with change log, one snapshot row may advance
multiple change log
+ private final List<InternalRow> mergedRows;
+
+ public SortMergeRows(List<InternalRow> mergedRows) {
+ this.mergedRows = mergedRows;
+ }
+
+ public SortMergeRows(InternalRow internalRow) {
+ this.mergedRows = Collections.singletonList(internalRow);
+ }
+ }
+
+ private <R> CloseableIterator<R> transform(
+ CloseableIterator<LogRecord> originElementIterator,
+ final Function<InternalRow, R> function) {
+ return new CloseableIterator<R>() {
+ private final CloseableIterator<LogRecord> inner =
originElementIterator;
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public R next() {
+ LogRecord element = inner.next();
+ return function.apply(element.getRow());
+ }
+ };
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
new file mode 100644
index 000000000..3c5917414
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.split;
+
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A split mixing Lake snapshot and Fluss log. */
+public class LakeSnapshotAndFlussLogSplit extends SourceSplitBase {
+
+ public static final byte LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND = -2;
+
+ // may be null when no snapshot data for the bucket
+ @Nullable private final List<LakeSplit> lakeSnapshotSplits;
+
+ /** The records to skip when reading the splits. */
+ private long recordOffset = 0;
+ // TODO: Support skip read file by record fileOffset
+
+ private final long startingOffset;
+ private final long stoppingOffset;
+
+ public LakeSnapshotAndFlussLogSplit(
+ TableBucket tableBucket,
+ @Nullable List<LakeSplit> snapshotSplits,
+ long startingOffset,
+ long stoppingOffset) {
+ this(tableBucket, null, snapshotSplits, startingOffset,
stoppingOffset, 0);
+ }
+
+ public LakeSnapshotAndFlussLogSplit(
+ TableBucket tableBucket,
+ @Nullable String partitionName,
+ @Nullable List<LakeSplit> snapshotSplits,
+ long startingOffset,
+ long stoppingOffset) {
+ this(tableBucket, partitionName, snapshotSplits, startingOffset,
stoppingOffset, 0);
+ }
+
+ public LakeSnapshotAndFlussLogSplit(
+ TableBucket tableBucket,
+ @Nullable String partitionName,
+ @Nullable List<LakeSplit> snapshotSplits,
+ long startingOffset,
+ long stoppingOffset,
+ long recordsToSkip) {
+ super(tableBucket, partitionName);
+ this.lakeSnapshotSplits = snapshotSplits;
+ this.startingOffset = startingOffset;
+ this.stoppingOffset = stoppingOffset;
+ this.recordOffset = recordsToSkip;
+ }
+
+ public LakeSnapshotAndFlussLogSplit updateWithRecordsToSkip(long
recordsToSkip) {
+ this.recordOffset = recordsToSkip;
+ return this;
+ }
+
+ public long getRecordsToSkip() {
+ return recordOffset;
+ }
+
+ public long getStartingOffset() {
+ return startingOffset;
+ }
+
+ public Optional<Long> getStoppingOffset() {
+ return stoppingOffset >= 0 ? Optional.of(stoppingOffset) :
Optional.empty();
+ }
+
+ @Override
+ public boolean isLakeSplit() {
+ return true;
+ }
+
+ protected byte splitKind() {
+ return LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
+ }
+
+ @Override
+ public String splitId() {
+ return toSplitId("lake-hybrid-snapshot-log-", tableBucket);
+ }
+
+ public List<LakeSplit> getLakeSplits() {
+ return lakeSnapshotSplits;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
new file mode 100644
index 000000000..65dda119f
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.state;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.flink.source.split.SourceSplitState;
+
+/** The state of {@link LakeSnapshotAndFlussLogSplit}. */
+public class LakeSnapshotAndFlussLogSplitState extends SourceSplitState {
+
+ private long recordsToSkip;
+ private final LakeSnapshotAndFlussLogSplit split;
+
+ public LakeSnapshotAndFlussLogSplitState(LakeSnapshotAndFlussLogSplit
split) {
+ super(split);
+ this.recordsToSkip = split.getRecordsToSkip();
+ this.split = split;
+ }
+
+ public void setRecordsToSkip(long recordsToSkip) {
+ this.recordsToSkip = recordsToSkip;
+ }
+
+ @Override
+ public SourceSplitBase toSourceSplit() {
+ return split.updateWithRecordsToSkip(recordsToSkip);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
new file mode 100644
index 000000000..b190852fb
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TableBucket;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import static
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static
com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit.LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test case for {@link LakeSplitSerializer}. */
+class LakeSplitSerializerTest {
+ private static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1;
+
+ private static final int SERIALIZER_VERSION = 3;
+
+ private static final byte[] TEST_DATA = "test-lake-split".getBytes();
+
+ private static final int STOPPING_OFFSET = 1024;
+
+ private static final LakeSplit LAKE_SPLIT =
+ new TestLakeSplit(0, Collections.singletonList("2025-08-18"));
+
+ private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer =
+ new TestSimpleVersionedSerializer();
+
+ @Mock private TableBucket tableBucket = new TableBucket(0, 1L, 0);
+
+ private final LakeSplitSerializer serializer = new
LakeSplitSerializer(sourceSplitSerializer);
+
+ @Test
+ void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {
+ // Prepare test data
+ LakeSnapshotSplit originalSplit =
+ new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT);
+
+ DataOutputSerializer output = new
DataOutputSerializer(STOPPING_OFFSET);
+ serializer.serialize(output, originalSplit);
+
+ SourceSplitBase deserializedSplit =
+ serializer.deserialize(
+ LAKE_SNAPSHOT_SPLIT_KIND,
+ tableBucket,
+ "2025-08-18",
+ new DataInputDeserializer(output.getCopyOfBuffer()));
+
+ assertThat(deserializedSplit instanceof LakeSnapshotSplit).isTrue();
+ LakeSnapshotSplit result = (LakeSnapshotSplit) deserializedSplit;
+
+ assertThat(tableBucket).isEqualTo(result.getTableBucket());
+ assertThat("2025-08-18").isEqualTo(result.getPartitionName());
+ assertThat(LAKE_SPLIT).isEqualTo(result.getLakeSplit());
+ }
+
+ @Test
+ void testSerializeAndDeserializeLakeSnapshotAndFlussLogSplit() throws
IOException {
+ LakeSnapshotAndFlussLogSplit originalSplit =
+ new LakeSnapshotAndFlussLogSplit(
+ tableBucket,
+ "2025-08-18",
+ Collections.singletonList(LAKE_SPLIT),
+ EARLIEST_OFFSET,
+ STOPPING_OFFSET);
+
+ DataOutputSerializer output = new
DataOutputSerializer(STOPPING_OFFSET);
+ serializer.serialize(output, originalSplit);
+
+ SourceSplitBase deserializedSplit =
+ serializer.deserialize(
+ LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND,
+ tableBucket,
+ "2025-08-18",
+ new DataInputDeserializer(output.getCopyOfBuffer()));
+
+ assertThat(deserializedSplit instanceof
LakeSnapshotAndFlussLogSplit).isTrue();
+ LakeSnapshotAndFlussLogSplit result = (LakeSnapshotAndFlussLogSplit)
deserializedSplit;
+
+ assertThat(tableBucket).isEqualTo(result.getTableBucket());
+ assertThat("2025-08-18").isEqualTo(result.getPartitionName());
+
assertThat(Collections.singletonList(LAKE_SPLIT)).isEqualTo(result.getLakeSplits());
+ assertThat(EARLIEST_OFFSET).isEqualTo(result.getStartingOffset());
+ assertThat((long)
STOPPING_OFFSET).isEqualTo(result.getStoppingOffset().get());
+ }
+
+ @Test
+ void testDeserializeWithWrongSplitKind() throws IOException {
+ DataOutputSerializer output = new DataOutputSerializer(1024);
+ output.writeInt(0);
+
+ assertThatThrownBy(
+ () ->
+ serializer.deserialize(
+ (byte) 99,
+ tableBucket,
+ "2023-10-01",
+ new
DataInputDeserializer(output.getCopyOfBuffer())))
+ .withFailMessage(() -> "Unsupported split kind: ")
+ .isInstanceOf(UnsupportedOperationException.class);
+ }
+
+ private static class TestSimpleVersionedSerializer
+ implements SimpleVersionedSerializer<LakeSplit> {
+
+ @Override
+ public byte[] serialize(LakeSplit split) throws IOException {
+ return TEST_DATA;
+ }
+
+ @Override
+ public LakeSplit deserialize(int version, byte[] serialized) throws
IOException {
+ return LAKE_SPLIT;
+ }
+
+ @Override
+ public int getVersion() {
+ return SERIALIZER_VERSION;
+ }
+ }
+
+ private static class TestLakeSplit implements LakeSplit {
+
+ private int bucket;
+ private List<String> partition;
+
+ public TestLakeSplit(int bucket, List<String> partition) {
+ this.bucket = bucket;
+ this.partition = partition;
+ }
+
+ @Override
+ public String toString() {
+ return "TestLakeSplit";
+ }
+
+ @Override
+ public int bucket() {
+ return bucket;
+ }
+
+ @Override
+ public List<String> partition() {
+ return partition;
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java
new file mode 100644
index 000000000..dd6bc430d
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/reader/SortMergeReaderTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.record.ChangeType;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.row.ProjectedRow;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SortMergeReader}. */
+class SortMergeReaderTest {
+
+ private static class FlussRowComparator implements Comparator<InternalRow>
{
+
+ private final int keyIndex;
+
+ public FlussRowComparator(int keyIndex) {
+ this.keyIndex = keyIndex;
+ }
+
+ @Override
+ public int compare(InternalRow o1, InternalRow o2) {
+ return o1.getInt(keyIndex) - o2.getInt(keyIndex);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadBatch(boolean isProjected) {
+ int keyIndex = 0;
+ int[] pkIndexes = new int[] {keyIndex};
+ int[] projectedFields = isProjected ? new int[] {keyIndex, 1} : null;
+ List<LogRecord> logRecords1 = createRecords(0, 10, false);
+ List<LogRecord> logRecords2 = createRecords(10, 10, false);
+ List<LogRecord> logRecords3 = createRecords(20, 10, false);
+ List<KeyValueRow> logRecords4 =
+ createRecords(5, 20, true).stream()
+ .map(logRecord -> new KeyValueRow(pkIndexes,
logRecord.getRow(), false))
+ .collect(Collectors.toList());
+
+ SortMergeReader sortMergeReader =
+ new SortMergeReader(
+ projectedFields,
+ new int[] {keyIndex},
+ Arrays.asList(
+ CloseableIterator.wrap(logRecords2.iterator()),
+ CloseableIterator.wrap(logRecords3.iterator()),
+
CloseableIterator.wrap(logRecords1.iterator())),
+ new FlussRowComparator(keyIndex),
+ CloseableIterator.wrap(logRecords4.iterator()));
+
+ List<InternalRow> actualRows = new ArrayList<>();
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRow.createFieldGetters(
+ isProjected
+ ? RowType.of(new IntType(), new StringType())
+ : RowType.of(new IntType(), new StringType(),
new StringType()));
+ try (CloseableIterator<InternalRow> iterator =
sortMergeReader.readBatch()) {
+ actualRows.addAll(materializeRows(iterator, fieldGetters));
+ }
+ assertThat(actualRows).hasSize(30);
+
+ List<LogRecord> expectedLogRecords = createRecords(0, 5, false);
+ expectedLogRecords.addAll(createRecords(5, 20, true));
+ expectedLogRecords.addAll(createRecords(25, 5, false));
+ ProjectedRow projectedRow = isProjected ?
ProjectedRow.from(projectedFields) : null;
+ CloseableIterator<InternalRow> expected =
+ isProjected
+ ? projected(
+
CloseableIterator.wrap(expectedLogRecords.iterator()), projectedRow)
+ : CloseableIterator.wrap(
+
expectedLogRecords.stream().map(LogRecord::getRow).iterator());
+ assertThat(actualRows).isEqualTo(materializeRows(expected,
fieldGetters));
+ }
+
+ private CloseableIterator<InternalRow> projected(
+ CloseableIterator<LogRecord> originElementIterator, final
ProjectedRow projectedRow) {
+ return new CloseableIterator<InternalRow>() {
+ private final CloseableIterator<LogRecord> inner =
originElementIterator;
+
+ @Override
+ public void close() {
+ inner.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return inner.hasNext();
+ }
+
+ @Override
+ public InternalRow next() {
+ LogRecord element = inner.next();
+ return projectedRow.replaceRow(element.getRow());
+ }
+ };
+ }
+
+ private List<InternalRow> materializeRows(
+ CloseableIterator<InternalRow> iterator, InternalRow.FieldGetter[]
fieldGetters) {
+ List<InternalRow> actualRows = new ArrayList<>();
+ while (iterator != null && iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ GenericRow genericRow = new GenericRow(fieldGetters.length);
+ for (int i = 0; i < fieldGetters.length; i++) {
+ genericRow.setField(i, fieldGetters[i].getFieldOrNull(row));
+ }
+ actualRows.add(genericRow);
+ }
+ return actualRows;
+ }
+
+ private List<LogRecord> createRecords(int startId, int count, boolean
isLog) {
+ List<LogRecord> logRecords = new ArrayList<>();
+
+ for (int i = 0; i < count; i++) {
+ GenericRow row =
+ row(
+ startId + i,
+ BinaryString.fromString(isLog ? "a" + "_updated" :
"a"),
+ BinaryString.fromString(isLog ? "A" + "_updated" :
"A"));
+ logRecords.add(new ScanRecord(i, System.currentTimeMillis(),
ChangeType.INSERT, row));
+ }
+ return logRecords;
+ }
+}