This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 1a4857135 [flink] Remove legacy union read code for batch mode (#1563)
1a4857135 is described below
commit 1a48571354f450597b9dc17820999d7b41f614a8
Author: Junbo Wang <[email protected]>
AuthorDate: Tue Aug 19 10:29:00 2025 +0800
[flink] Remove legacy union read code for batch mode (#1563)
---
.../fluss/flink/lake/LakeRecordRecordEmitter.java | 12 +-
.../fluss/flink/lake/LakeSplitReaderGenerator.java | 84 +-----
.../flink/lake/LakeSplitStateInitializer.java | 6 +-
.../fluss/flink/lakehouse/LakeSplitGenerator.java | 311 --------------------
.../fluss/flink/lakehouse/LakeSplitSerializer.java | 119 --------
.../flink/lakehouse/paimon/reader/KeyValueRow.java | 47 ---
.../lakehouse/paimon/reader/PaimonRowWrapper.java | 117 --------
.../reader/PaimonSnapshotAndLogSplitScanner.java | 227 ---------------
.../paimon/reader/PaimonSnapshotScanner.java | 127 --------
.../lakehouse/paimon/reader/ScanRecordWrapper.java | 180 ------------
.../lakehouse/paimon/reader/SortMergeReader.java | 322 ---------------------
.../split/PaimonSnapshotAndFlussLogSplit.java | 134 ---------
.../split/PaimonSnapshotAndFlussLogSplitState.java | 45 ---
.../paimon/split/PaimonSnapshotSplit.java | 95 ------
.../paimon/split/PaimonSnapshotSplitState.java | 52 ----
.../source/reader/FlinkSourceSplitReader.java | 5 +-
.../fluss/flink/lake/LakeSplitSerializerTest.java | 3 +-
fluss-test-coverage/pom.xml | 1 -
18 files changed, 6 insertions(+), 1881 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
index 9e6848864..5b1946985 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -20,8 +20,6 @@ package com.alibaba.fluss.flink.lake;
import com.alibaba.fluss.client.table.scanner.ScanRecord;
import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
import com.alibaba.fluss.flink.source.reader.RecordAndPos;
import com.alibaba.fluss.flink.source.split.SourceSplitState;
@@ -42,15 +40,7 @@ public class LakeRecordRecordEmitter<OUT> {
SourceSplitState splitState,
SourceOutput<OUT> sourceOutput,
RecordAndPos recordAndPos) {
- if (splitState instanceof PaimonSnapshotSplitState) {
- ((PaimonSnapshotSplitState) splitState)
- .setRecordsToSkip(recordAndPos.readRecordsCount());
- sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
- } else if (splitState instanceof PaimonSnapshotAndFlussLogSplitState) {
- ((PaimonSnapshotAndFlussLogSplitState) splitState)
- .setRecordsToSkip(recordAndPos.readRecordsCount());
- sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
- } else if (splitState instanceof LakeSnapshotSplitState) {
+ if (splitState instanceof LakeSnapshotSplitState) {
((LakeSnapshotSplitState)
splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
} else if (splitState instanceof LakeSnapshotAndFlussLogSplitState) {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
index 410bef73d..86a72f66c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -22,58 +22,34 @@ import
com.alibaba.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner;
import com.alibaba.fluss.flink.lake.reader.LakeSnapshotScanner;
import com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
-import
com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
-import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
import com.alibaba.fluss.flink.source.reader.BoundedSplitReader;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.flink.utils.DataLakeUtils;
import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.source.LakeSplit;
-import com.alibaba.fluss.metadata.TablePath;
-
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.ReadBuilder;
import javax.annotation.Nullable;
import java.util.Queue;
-import java.util.stream.IntStream;
/** A generator to generate reader for lake split. */
public class LakeSplitReaderGenerator {
private final Table table;
- private final TablePath tablePath;
- private FileStoreTable fileStoreTable;
private final @Nullable int[] projectedFields;
private final @Nullable LakeSource<LakeSplit> lakeSource;
public LakeSplitReaderGenerator(
Table table,
- TablePath tablePath,
@Nullable int[] projectedFields,
@Nullable LakeSource<LakeSplit> lakeSource) {
this.table = table;
- this.tablePath = tablePath;
this.projectedFields = projectedFields;
this.lakeSource = lakeSource;
}
public void addSplit(SourceSplitBase split, Queue<SourceSplitBase>
boundedSplits) {
- if (split instanceof PaimonSnapshotSplit) {
- boundedSplits.add(split);
- } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
- boundedSplits.add(split);
- } else if (split instanceof LakeSnapshotSplit) {
+ if (split instanceof LakeSnapshotSplit) {
boundedSplits.add(split);
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
boundedSplits.add(split);
@@ -84,32 +60,7 @@ public class LakeSplitReaderGenerator {
}
public BoundedSplitReader getBoundedSplitScanner(SourceSplitBase split) {
- if (split instanceof PaimonSnapshotSplit) {
- PaimonSnapshotSplit paimonSnapshotSplit = (PaimonSnapshotSplit)
split;
- FileStoreTable paimonStoreTable = getFileStoreTable();
- int[] projectedFields = getProjectedFieldsForPaimonTable(table);
- ReadBuilder readBuilder =
-
paimonStoreTable.newReadBuilder().withProjection(projectedFields);
- PaimonSnapshotScanner paimonSnapshotScanner =
- new PaimonSnapshotScanner(
- readBuilder.newRead(),
paimonSnapshotSplit.getFileStoreSourceSplit());
- return new BoundedSplitReader(
- paimonSnapshotScanner,
-
paimonSnapshotSplit.getFileStoreSourceSplit().recordsToSkip());
- } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
- PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
- (PaimonSnapshotAndFlussLogSplit) split;
- FileStoreTable paimonStoreTable = getFileStoreTable();
- PaimonSnapshotAndLogSplitScanner paimonSnapshotAndLogSplitScanner =
- new PaimonSnapshotAndLogSplitScanner(
- table,
- paimonStoreTable,
- paimonSnapshotAndFlussLogSplit,
- projectedFields);
- return new BoundedSplitReader(
- paimonSnapshotAndLogSplitScanner,
- paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
- } else if (split instanceof LakeSnapshotSplit) {
+ if (split instanceof LakeSnapshotSplit) {
LakeSnapshotSplit lakeSnapshotSplit = (LakeSnapshotSplit) split;
LakeSnapshotScanner lakeSnapshotScanner =
new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
@@ -129,35 +80,4 @@ public class LakeSplitReaderGenerator {
String.format("The split type of %s is not supported.",
split.getClass()));
}
}
-
- private int[] getProjectedFieldsForPaimonTable(Table flussTable) {
- return this.projectedFields != null
- ? this.projectedFields
- // only read the field in origin fluss table, not include
log_offset, log_timestamp
- // fields
- : IntStream.range(0,
flussTable.getTableInfo().getRowType().getFieldCount())
- .toArray();
- }
-
- private FileStoreTable getFileStoreTable() {
- if (fileStoreTable != null) {
- return fileStoreTable;
- }
-
- try (Catalog paimonCatalog =
- FlinkCatalogFactory.createPaimonCatalog(
- Options.fromMap(
- DataLakeUtils.extractLakeCatalogProperties(
-
table.getTableInfo().getProperties())))) {
- fileStoreTable =
- (FileStoreTable)
- paimonCatalog.getTable(
- Identifier.create(
- tablePath.getDatabaseName(),
tablePath.getTableName()));
- return fileStoreTable;
- } catch (Exception e) {
- throw new FlinkRuntimeException(
- "Fail to get paimon table.",
ExceptionUtils.stripExecutionException(e));
- }
- }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
index a8779bce0..bac540257 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
@@ -21,8 +21,6 @@ import
com.alibaba.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
import com.alibaba.fluss.flink.lake.state.LakeSnapshotAndFlussLogSplitState;
import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.flink.source.split.SourceSplitState;
@@ -30,9 +28,7 @@ import com.alibaba.fluss.flink.source.split.SourceSplitState;
public class LakeSplitStateInitializer {
public static SourceSplitState initializedState(SourceSplitBase split) {
- if (split instanceof PaimonSnapshotAndFlussLogSplit) {
- return new
PaimonSnapshotAndFlussLogSplitState((PaimonSnapshotAndFlussLogSplit) split);
- } else if (split instanceof LakeSnapshotSplit) {
+ if (split instanceof LakeSnapshotSplit) {
return new LakeSnapshotSplitState((LakeSnapshotSplit) split);
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
return new
LakeSnapshotAndFlussLogSplitState((LakeSnapshotAndFlussLogSplit) split);
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitGenerator.java
deleted file mode 100644
index 12d010f92..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitGenerator.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse;
-
-import com.alibaba.fluss.client.admin.Admin;
-import com.alibaba.fluss.client.metadata.LakeSnapshot;
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
-import
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
-import com.alibaba.fluss.flink.source.split.LogSplit;
-import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.metadata.PartitionInfo;
-import com.alibaba.fluss.metadata.TableBucket;
-import com.alibaba.fluss.metadata.TableInfo;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.flink.FlinkCatalogFactory;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
-import org.apache.paimon.options.MemorySize;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.InnerTableScan;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
-import static
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
-import static com.alibaba.fluss.utils.Preconditions.checkState;
-
-/**
- * A generator for lake splits.
- *
- * <p>todo: current is always assume is paimon, make it pluggable.
- */
-public class LakeSplitGenerator {
-
- private final TableInfo tableInfo;
- private final Admin flussAdmin;
- private final OffsetsInitializer.BucketOffsetsRetriever
bucketOffsetsRetriever;
- private final OffsetsInitializer stoppingOffsetInitializer;
- private final int bucketCount;
-
- public LakeSplitGenerator(
- TableInfo tableInfo,
- Admin flussAdmin,
- OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
- OffsetsInitializer stoppingOffsetInitializer,
- int bucketCount) {
- this.tableInfo = tableInfo;
- this.flussAdmin = flussAdmin;
- this.bucketOffsetsRetriever = bucketOffsetsRetriever;
- this.stoppingOffsetInitializer = stoppingOffsetInitializer;
- this.bucketCount = bucketCount;
- }
-
- public List<SourceSplitBase> generateLakeSplits() throws Exception {
- // get the file store
- LakeSnapshot lakeSnapshotInfo =
-
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
- FileStoreTable fileStoreTable =
- getTable(
- lakeSnapshotInfo.getSnapshotId(),
-
extractLakeCatalogProperties(tableInfo.getProperties()));
- boolean isLogTable = fileStoreTable.schema().primaryKeys().isEmpty();
- boolean isPartitioned =
!fileStoreTable.schema().partitionKeys().isEmpty();
-
- if (isPartitioned) {
- List<PartitionInfo> partitionInfos =
-
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
- Map<Long, String> partitionNameById =
- partitionInfos.stream()
- .collect(
- Collectors.toMap(
- PartitionInfo::getPartitionId,
- PartitionInfo::getPartitionName));
- return generatePartitionTableSplit(
- isLogTable,
- fileStoreTable,
- lakeSnapshotInfo.getTableBucketsOffset(),
- partitionNameById);
- } else {
- // non-partitioned table
- return generateNoPartitionedTableSplit(
- isLogTable, fileStoreTable,
lakeSnapshotInfo.getTableBucketsOffset());
- }
- }
-
- private List<SourceSplitBase> generatePartitionTableSplit(
- boolean isLogTable,
- FileStoreTable fileStoreTable,
- Map<TableBucket, Long> tableBucketSnapshotLogOffset,
- Map<Long, String> partitionNameById) {
- List<SourceSplitBase> splits = new ArrayList<>();
- for (Map.Entry<Long, String> partitionNameByIdEntry :
partitionNameById.entrySet()) {
- long partitionId = partitionNameByIdEntry.getKey();
- String partitionName = partitionNameByIdEntry.getValue();
- Map<Integer, Long> bucketEndOffset =
- stoppingOffsetInitializer.getBucketOffsets(
- partitionName,
- IntStream.range(0,
bucketCount).boxed().collect(Collectors.toList()),
- bucketOffsetsRetriever);
- splits.addAll(
- generateSplit(
- fileStoreTable,
- partitionId,
- partitionName,
- isLogTable,
- tableBucketSnapshotLogOffset,
- bucketEndOffset));
- }
- return splits;
- }
-
- private List<SourceSplitBase> generateSplit(
- FileStoreTable fileStoreTable,
- @Nullable Long partitionId,
- @Nullable String partitionName,
- boolean isLogTable,
- Map<TableBucket, Long> tableBucketSnapshotLogOffset,
- Map<Integer, Long> bucketEndOffset) {
- List<SourceSplitBase> splits = new ArrayList<>();
- FileStoreSourceSplitGenerator splitGenerator = new
FileStoreSourceSplitGenerator();
- if (isLogTable) {
- // it's log table, we don't care about bucket, and we can't get
bucket in paimon's
- // dynamic bucket; so first generate split for the whole paimon
snapshot,
- // then generate log split for each bucket paimon snapshot + fluss
log
- splits.addAll(
- generateSplitForLogSnapshot(
- fileStoreTable, splitGenerator, partitionId,
partitionName));
- for (int bucket = 0; bucket < bucketCount; bucket++) {
- TableBucket tableBucket =
- new TableBucket(tableInfo.getTableId(), partitionId,
bucket);
- Long snapshotLogOffset =
tableBucketSnapshotLogOffset.get(tableBucket);
- long stoppingOffset = bucketEndOffset.get(bucket);
- if (snapshotLogOffset == null) {
- // no any data commit to this bucket, scan from fluss log
- splits.add(
- new LogSplit(
- tableBucket, partitionName,
EARLIEST_OFFSET, stoppingOffset));
- } else {
- // need to read remain fluss log
- if (snapshotLogOffset < stoppingOffset) {
- splits.add(
- new LogSplit(
- tableBucket,
- partitionName,
- snapshotLogOffset,
- stoppingOffset));
- }
- }
- }
- } else {
- // it's primary key table
- for (int bucket = 0; bucket < bucketCount; bucket++) {
- TableBucket tableBucket =
- new TableBucket(tableInfo.getTableId(), partitionId,
bucket);
- Long snapshotLogOffset =
tableBucketSnapshotLogOffset.get(tableBucket);
- long stoppingOffset = bucketEndOffset.get(bucket);
- splits.add(
- generateSplitForPrimaryKeyTableBucket(
- fileStoreTable,
- splitGenerator,
- tableBucket,
- partitionName,
- snapshotLogOffset,
- stoppingOffset));
- }
- }
-
- return splits;
- }
-
- private List<SourceSplitBase> generateSplitForLogSnapshot(
- FileStoreTable fileStoreTable,
- FileStoreSourceSplitGenerator splitGenerator,
- @Nullable Long partitionId,
- @Nullable String partitionName) {
- List<SourceSplitBase> splits = new ArrayList<>();
- // paimon snapshot
- InnerTableScan scan = fileStoreTable.newScan();
- if (partitionName != null) {
- scan = scan.withPartitionFilter(getPartitionSpec(fileStoreTable,
partitionName));
- }
- // for snapshot splits, we always use bucket = -1 ad the bucket since
we can't get bucket in
- // paimon's log table
- TableBucket tableBucket = new TableBucket(tableInfo.getTableId(),
partitionId, -1);
- // snapshot splits + one log split
- for (FileStoreSourceSplit fileStoreSourceSplit :
splitGenerator.createSplits(scan.plan())) {
- splits.add(new PaimonSnapshotSplit(tableBucket, partitionName,
fileStoreSourceSplit));
- }
- return splits;
- }
-
- private Map<String, String> getPartitionSpec(
- FileStoreTable fileStoreTable, String partitionName) {
- List<String> partitionKeys = fileStoreTable.partitionKeys();
- checkState(
- partitionKeys.size() == 1,
- "Must only one partition key for paimon table %, but got %s,
the partition keys are: ",
- tableInfo.getTablePath(),
- partitionKeys.size(),
- partitionKeys);
- return Collections.singletonMap(partitionKeys.get(0), partitionName);
- }
-
- private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
- FileStoreTable fileStoreTable,
- FileStoreSourceSplitGenerator splitGenerator,
- TableBucket tableBucket,
- @Nullable String partitionName,
- @Nullable Long snapshotLogOffset,
- long stoppingOffset) {
-
- // no snapshot data for this bucket or no a corresponding log offset
in this bucket,
- // can only scan from change log
- if (snapshotLogOffset == null || snapshotLogOffset < 0) {
- return new PaimonSnapshotAndFlussLogSplit(
- tableBucket, partitionName, null, EARLIEST_OFFSET,
stoppingOffset);
- }
-
- // then, generate a split contains
- // snapshot and change log so that we can merge change log and snapshot
- // to get the full data
- fileStoreTable =
- fileStoreTable.copy(
- Collections.singletonMap(
- CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(),
- // we set a max size to make sure only one
splits
- MemorySize.MAX_VALUE.toString()));
- InnerTableScan tableScan =
- fileStoreTable.newScan().withBucketFilter((b) -> b ==
tableBucket.getBucket());
-
- if (partitionName != null) {
- tableScan =
-
tableScan.withPartitionFilter(getPartitionSpec(fileStoreTable, partitionName));
- }
-
- List<FileStoreSourceSplit> fileStoreSourceSplits =
- splitGenerator.createSplits(tableScan.plan());
-
- checkState(fileStoreSourceSplits.size() == 1, "Splits for primary key
table must be 1.");
- FileStoreSourceSplit fileStoreSourceSplit =
fileStoreSourceSplits.get(0);
- return new PaimonSnapshotAndFlussLogSplit(
- tableBucket,
- partitionName,
- fileStoreSourceSplit,
- snapshotLogOffset,
- stoppingOffset);
- }
-
- private List<SourceSplitBase> generateNoPartitionedTableSplit(
- boolean isLogTable,
- FileStoreTable fileStoreTable,
- Map<TableBucket, Long> tableBucketSnapshotLogOffset) {
- // iterate all bucket
- // assume bucket is from 0 to bucket count
- Map<Integer, Long> bucketEndOffset =
- stoppingOffsetInitializer.getBucketOffsets(
- null,
- IntStream.range(0,
bucketCount).boxed().collect(Collectors.toList()),
- bucketOffsetsRetriever);
- return generateSplit(
- fileStoreTable,
- null,
- null,
- isLogTable,
- tableBucketSnapshotLogOffset,
- bucketEndOffset);
- }
-
- private FileStoreTable getTable(long snapshotId, Map<String, String>
catalogProperties)
- throws Exception {
- try (Catalog catalog =
-
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(catalogProperties))) {
- return (FileStoreTable)
- catalog.getTable(
- Identifier.create(
-
tableInfo.getTablePath().getDatabaseName(),
-
tableInfo.getTablePath().getTableName()))
- .copy(
- Collections.singletonMap(
- CoreOptions.SCAN_SNAPSHOT_ID.key(),
- String.valueOf(snapshotId)));
- }
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitSerializer.java
deleted file mode 100644
index 76c4e4452..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitSerializer.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse;
-
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
-import com.alibaba.fluss.flink.source.split.LogSplit;
-import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.metadata.TableBucket;
-
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.flink.source.FileStoreSourceSplitSerializer;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-
-import static
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit.PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
-import static
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit.PAIMON_SNAPSHOT_SPLIT_KIND;
-
-/** A serializer for lake split. */
-public class LakeSplitSerializer {
-
- private final FileStoreSourceSplitSerializer
fileStoreSourceSplitSerializer =
- new FileStoreSourceSplitSerializer();
-
- public void serialize(DataOutputSerializer out, SourceSplitBase split)
throws IOException {
- FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
- new FileStoreSourceSplitSerializer();
- if (split instanceof PaimonSnapshotSplit) {
- FileStoreSourceSplit fileStoreSourceSplit =
- ((PaimonSnapshotSplit) split).getFileStoreSourceSplit();
- byte[] serializeBytes =
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
- out.writeInt(serializeBytes.length);
- out.write(serializeBytes);
- } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
- // writing file store source split
- PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
- ((PaimonSnapshotAndFlussLogSplit) split);
- FileStoreSourceSplit fileStoreSourceSplit =
- paimonSnapshotAndFlussLogSplit.getSnapshotSplit();
- if (fileStoreSourceSplit == null) {
- // no snapshot data for the bucket
- out.writeBoolean(false);
- } else {
- out.writeBoolean(true);
- byte[] serializeBytes =
-
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
- out.writeInt(serializeBytes.length);
- out.write(serializeBytes);
- }
- // writing starting/stopping offset
- out.writeLong(paimonSnapshotAndFlussLogSplit.getStartingOffset());
- out.writeLong(
- paimonSnapshotAndFlussLogSplit
- .getStoppingOffset()
- .orElse(LogSplit.NO_STOPPING_OFFSET));
- out.writeLong(paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
- } else {
- throw new UnsupportedOperationException(
- "Unsupported split type: " + split.getClass().getName());
- }
- }
-
- public SourceSplitBase deserialize(
- byte splitKind,
- TableBucket tableBucket,
- @Nullable String partition,
- DataInputDeserializer input)
- throws IOException {
- if (splitKind == PAIMON_SNAPSHOT_SPLIT_KIND) {
- byte[] serializeBytes = new byte[input.readInt()];
- input.read(serializeBytes);
- FileStoreSourceSplit fileStoreSourceSplit =
- fileStoreSourceSplitSerializer.deserialize(
- fileStoreSourceSplitSerializer.getVersion(),
serializeBytes);
-
- return new PaimonSnapshotSplit(tableBucket, partition,
fileStoreSourceSplit);
- } else if (splitKind == PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
- FileStoreSourceSplit fileStoreSourceSplit = null;
- if (input.readBoolean()) {
- byte[] serializeBytes = new byte[input.readInt()];
- input.read(serializeBytes);
- fileStoreSourceSplit =
- fileStoreSourceSplitSerializer.deserialize(
- fileStoreSourceSplitSerializer.getVersion(),
serializeBytes);
- }
- long startingOffset = input.readLong();
- long stoppingOffset = input.readLong();
- long recordsToSkip = input.readLong();
- return new PaimonSnapshotAndFlussLogSplit(
- tableBucket,
- partition,
- fileStoreSourceSplit,
- startingOffset,
- stoppingOffset,
- recordsToSkip);
- } else {
- throw new UnsupportedOperationException("Unsupported split kind: "
+ splitKind);
- }
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/KeyValueRow.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/KeyValueRow.java
deleted file mode 100644
index 86a7f74f8..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/KeyValueRow.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.utils.ProjectedRow;
-
-/** An {@link InternalRow} with the key part. */
-public class KeyValueRow {
-
- private final boolean isDelete;
- private final ProjectedRow pkRow;
- private final InternalRow valueRow;
-
- public KeyValueRow(int[] indexes, InternalRow valueRow, boolean isDelete) {
- this.pkRow = ProjectedRow.from(indexes).replaceRow(valueRow);
- this.valueRow = valueRow;
- this.isDelete = isDelete;
- }
-
- public boolean isDelete() {
- return isDelete;
- }
-
- public InternalRow keyRow() {
- return pkRow;
- }
-
- public InternalRow valueRow() {
- return valueRow;
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonRowWrapper.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonRowWrapper.java
deleted file mode 100644
index ad508adc7..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonRowWrapper.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.row.BinaryString;
-import com.alibaba.fluss.row.Decimal;
-import com.alibaba.fluss.row.InternalRow;
-import com.alibaba.fluss.row.TimestampLtz;
-import com.alibaba.fluss.row.TimestampNtz;
-
-/**
- * A wrapper for Paimon's InternalRow to bridge the Paimon's InternalRow to
Fluss's InternalRow. .
- */
-public class PaimonRowWrapper implements InternalRow {
- private final org.apache.paimon.data.InternalRow paimonRow;
-
- public PaimonRowWrapper(org.apache.paimon.data.InternalRow paimonRow) {
- this.paimonRow = paimonRow;
- }
-
- @Override
- public int getFieldCount() {
- return paimonRow.getFieldCount();
- }
-
- @Override
- public boolean isNullAt(int pos) {
- return paimonRow.isNullAt(pos);
- }
-
- @Override
- public boolean getBoolean(int pos) {
- return paimonRow.getBoolean(pos);
- }
-
- @Override
- public byte getByte(int pos) {
- return paimonRow.getByte(pos);
- }
-
- @Override
- public short getShort(int pos) {
- return paimonRow.getShort(pos);
- }
-
- @Override
- public int getInt(int pos) {
- return paimonRow.getInt(pos);
- }
-
- @Override
- public long getLong(int pos) {
- return paimonRow.getLong(pos);
- }
-
- @Override
- public float getFloat(int pos) {
- return paimonRow.getFloat(pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return paimonRow.getDouble(pos);
- }
-
- @Override
- public BinaryString getChar(int pos, int length) {
- return BinaryString.fromBytes(paimonRow.getString(pos).toBytes());
- }
-
- @Override
- public BinaryString getString(int pos) {
- return BinaryString.fromBytes(paimonRow.getString(pos).toBytes());
- }
-
- @Override
- public Decimal getDecimal(int pos, int precision, int scale) {
- return Decimal.fromBigDecimal(
- paimonRow.getDecimal(pos, precision, scale).toBigDecimal(),
precision, scale);
- }
-
- @Override
- public TimestampNtz getTimestampNtz(int pos, int precision) {
- return TimestampNtz.fromLocalDateTime(
- paimonRow.getTimestamp(pos, precision).toLocalDateTime());
- }
-
- @Override
- public TimestampLtz getTimestampLtz(int pos, int precision) {
- return TimestampLtz.fromInstant(paimonRow.getTimestamp(pos,
precision).toInstant());
- }
-
- @Override
- public byte[] getBinary(int pos, int length) {
- return paimonRow.getBinary(pos);
- }
-
- @Override
- public byte[] getBytes(int pos) {
- return paimonRow.getBinary(pos);
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotAndLogSplitScanner.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotAndLogSplitScanner.java
deleted file mode 100644
index 95b66c33f..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotAndLogSplitScanner.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.client.table.Table;
-import com.alibaba.fluss.client.table.scanner.ScanRecord;
-import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
-import com.alibaba.fluss.client.table.scanner.log.LogScanner;
-import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
-import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
-import com.alibaba.fluss.metadata.TableBucket;
-import com.alibaba.fluss.record.ChangeType;
-import com.alibaba.fluss.utils.CloseableIterator;
-
-import org.apache.paimon.KeyValueFileStore;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.reader.EmptyRecordReader;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.source.TableRead;
-import org.apache.paimon.types.RowType;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-/** A scanner to merge the paimon's snapshot and change log. */
-public class PaimonSnapshotAndLogSplitScanner implements BatchScanner {
-
- private final TableRead tableRead;
- private final PaimonSnapshotAndFlussLogSplit snapshotAndFlussLogSplit;
- // the origin indexes of primary key in origin table
- private final int[] pkIndexes;
-
- // the indexes of primary key in emitted row by paimon and fluss
- private int[] keyIndexesInRow;
- private final Comparator<InternalRow> keyComparator;
-
- // the sorted logs in memory, mapping from key -> value
- private final SortedMap<InternalRow, KeyValueRow> logRows;
-
- private final LogScanner logScanner;
- private final long stoppingOffset;
- private final RowType paimonRowType;
-
- private boolean logScanFinished;
- private SortMergeReader currentSortMergeReader;
- private RecordReader<InternalRow> snapshotRecordReader;
- @Nullable private int[] adjustProjectedFields;
-
- public PaimonSnapshotAndLogSplitScanner(
- Table flussTable,
- FileStoreTable fileStoreTable,
- PaimonSnapshotAndFlussLogSplit snapshotAndFlussLogSplit,
- @Nullable int[] projectedFields) {
- this.pkIndexes =
flussTable.getTableInfo().getSchema().getPrimaryKeyIndexes();
- this.paimonRowType = fileStoreTable.rowType();
- int[] newProjectedFields = getNeedProjectFields(flussTable,
projectedFields);
- this.tableRead =
-
fileStoreTable.newReadBuilder().withProjection(newProjectedFields).newRead();
- this.snapshotAndFlussLogSplit = snapshotAndFlussLogSplit;
- this.keyComparator = ((KeyValueFileStore)
fileStoreTable.store()).newKeyComparator();
- this.logRows = new TreeMap<>(keyComparator);
- this.logScanner =
flussTable.newScan().project(newProjectedFields).createLogScanner();
-
- TableBucket tableBucket = snapshotAndFlussLogSplit.getTableBucket();
- if (tableBucket.getPartitionId() != null) {
- this.logScanner.subscribe(
- tableBucket.getPartitionId(),
- tableBucket.getBucket(),
- snapshotAndFlussLogSplit.getStartingOffset());
- } else {
- this.logScanner.subscribe(
- tableBucket.getBucket(),
snapshotAndFlussLogSplit.getStartingOffset());
- }
-
- this.stoppingOffset =
- snapshotAndFlussLogSplit
- .getStoppingOffset()
- .orElseThrow(
- () ->
- new RuntimeException(
- "StoppingOffset is null for
split: "
- +
snapshotAndFlussLogSplit));
-
- // starting offset is greater than or equal to stoppingOffset, no any
log need to scan
- this.logScanFinished = snapshotAndFlussLogSplit.getStartingOffset() >=
stoppingOffset;
- }
-
- @Override
- @Nullable
- public CloseableIterator<com.alibaba.fluss.row.InternalRow>
pollBatch(Duration poolTimeOut)
- throws IOException {
- if (logScanFinished) {
- if (currentSortMergeReader == null) {
- currentSortMergeReader = createSortMergeReader();
- }
- return currentSortMergeReader.readBatch();
- } else {
- pollLogRecords(poolTimeOut);
- return CloseableIterator.wrap(Collections.emptyIterator());
- }
- }
-
- private int[] getNeedProjectFields(Table flussTable, @Nullable int[]
originProjectedFields) {
- if (originProjectedFields != null) {
- // we need to include the primary key in projected fields to sort
merge by pk
- // if the provided don't include, we need to include it
- List<Integer> newProjectedFields =
-
Arrays.stream(originProjectedFields).boxed().collect(Collectors.toList());
-
- // the indexes of primary key with new projected fields
- keyIndexesInRow = new int[pkIndexes.length];
- for (int i = 0; i < pkIndexes.length; i++) {
- int primaryKeyIndex = pkIndexes[i];
- // search the pk in projected fields
- int indexInProjectedFields = findIndex(originProjectedFields,
primaryKeyIndex);
- if (indexInProjectedFields >= 0) {
- keyIndexesInRow[i] = indexInProjectedFields;
- } else {
- // no pk in projected fields, we must include it to do
- // merge sort
- newProjectedFields.add(primaryKeyIndex);
- keyIndexesInRow[i] = newProjectedFields.size() - 1;
- }
- }
- int[] newProjection =
newProjectedFields.stream().mapToInt(Integer::intValue).toArray();
- // the underlying scan will use the new projection to scan data,
- // but will still need to map from the new projection to the
origin projected fields
- int[] adjustProjectedFields = new
int[originProjectedFields.length];
- for (int i = 0; i < originProjectedFields.length; i++) {
- adjustProjectedFields[i] = findIndex(newProjection,
originProjectedFields[i]);
- }
- this.adjustProjectedFields = adjustProjectedFields;
- return newProjection;
- } else {
- // no projectedFields, use all fields
- keyIndexesInRow = pkIndexes;
- return IntStream.range(0,
flussTable.getTableInfo().getRowType().getFieldCount())
- .toArray();
- }
- }
-
- private int findIndex(int[] array, int target) {
- int index = -1;
- for (int i = 0; i < array.length; i++) {
- if (array[i] == target) {
- index = i;
- break;
- }
- }
- return index;
- }
-
- private SortMergeReader createSortMergeReader() throws IOException {
- FileStoreSourceSplit fileStoreSourceSplit =
snapshotAndFlussLogSplit.getSnapshotSplit();
- snapshotRecordReader =
- fileStoreSourceSplit == null
- ? new EmptyRecordReader<>()
- : tableRead.createReader(fileStoreSourceSplit.split());
- return new SortMergeReader(
- adjustProjectedFields,
- keyIndexesInRow,
- snapshotRecordReader,
- CloseableIterator.wrap(logRows.values().iterator()),
- keyComparator);
- }
-
- private void pollLogRecords(Duration timeout) {
- ScanRecords scanRecords = logScanner.poll(timeout);
- for (ScanRecord scanRecord : scanRecords) {
- InternalRow paimonRow = new ScanRecordWrapper(scanRecord,
paimonRowType);
- boolean isDelete =
- scanRecord.getChangeType() == ChangeType.DELETE
- || scanRecord.getChangeType() ==
ChangeType.UPDATE_BEFORE;
- KeyValueRow keyValueRow = new KeyValueRow(keyIndexesInRow,
paimonRow, isDelete);
- InternalRow keyRow = keyValueRow.keyRow();
- // upsert the key value row
- logRows.put(keyRow, keyValueRow);
- if (scanRecord.logOffset() >= stoppingOffset - 1) {
- // has reached to the end
- logScanFinished = true;
- break;
- }
- }
- }
-
- @Override
- public void close() throws IOException {
- try {
- if (logScanner != null) {
- logScanner.close();
- }
- if (snapshotRecordReader != null) {
- snapshotRecordReader.close();
- snapshotRecordReader = null;
- }
- } catch (Exception e) {
- throw new IOException("Failed to close resources", e);
- }
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotScanner.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotScanner.java
deleted file mode 100644
index 50a6c2a74..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/PaimonSnapshotScanner.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
-import com.alibaba.fluss.utils.CloseableIterator;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.table.source.TableRead;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Objects;
-
-/* This file is based on source code of Apache Paimon Project
(https://paimon.apache.org/), licensed by the Apache
- * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
- * additional information regarding copyright ownership. */
-
-/** A scanner for reading paimon split. Most logic is copied from paimon. */
-public class PaimonSnapshotScanner implements BatchScanner {
-
- private final TableRead tableRead;
- @Nullable private LazyRecordReader currentReader;
-
- public PaimonSnapshotScanner(TableRead tableRead, FileStoreSourceSplit
fileStoreSourceSplit) {
- this.tableRead = tableRead;
- this.currentReader = new
LazyRecordReader(fileStoreSourceSplit.split());
- }
-
- @Override
- @Nullable
- public CloseableIterator<com.alibaba.fluss.row.InternalRow>
pollBatch(Duration timeout) {
- try {
- RecordReader.RecordIterator<InternalRow> nextBatch =
-
Objects.requireNonNull(currentReader).recordReader().readBatch();
- if (nextBatch == null) {
- return null;
- } else {
- return new PaimonRowIteratorWrapper(nextBatch);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() throws IOException {
- LazyRecordReader recordReader = currentReader;
- if (recordReader != null) {
- if (recordReader.lazyRecordReader != null) {
- recordReader.lazyRecordReader.close();
- }
- currentReader = null;
- }
- }
-
- private static class PaimonRowIteratorWrapper
- implements CloseableIterator<com.alibaba.fluss.row.InternalRow> {
- private final RecordReader.RecordIterator<InternalRow> recordBatch;
- private @Nullable InternalRow paimonRow;
-
- public
PaimonRowIteratorWrapper(RecordReader.RecordIterator<InternalRow> recordBatch) {
- this.recordBatch = recordBatch;
- }
-
- @Override
- public boolean hasNext() {
- if (paimonRow != null) {
- return true;
- }
- try {
- paimonRow = recordBatch.next();
- return paimonRow != null;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public com.alibaba.fluss.row.InternalRow next() {
- PaimonRowWrapper wrapper = new PaimonRowWrapper(paimonRow);
- paimonRow = null;
- return wrapper;
- }
-
- @Override
- public void close() {
- recordBatch.releaseBatch();
- }
- }
-
- /** Lazy to create {@link RecordReader} to improve performance for limit.
*/
- private class LazyRecordReader {
- private final Split split;
- private RecordReader<InternalRow> lazyRecordReader;
-
- private LazyRecordReader(Split split) {
- this.split = split;
- }
-
- public RecordReader<InternalRow> recordReader() throws IOException {
- if (lazyRecordReader == null) {
- lazyRecordReader = tableRead.createReader(split);
- }
- return lazyRecordReader;
- }
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
deleted file mode 100644
index 47e844503..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.client.table.scanner.ScanRecord;
-import com.alibaba.fluss.record.ChangeType;
-import com.alibaba.fluss.row.TimestampLtz;
-import com.alibaba.fluss.row.TimestampNtz;
-
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.Decimal;
-import org.apache.paimon.data.InternalArray;
-import org.apache.paimon.data.InternalMap;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.variant.Variant;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.types.RowType;
-
-/** A wrapper of {@link ScanRecord} which bridges {@link ScanRecord} to
Paimon' internal row. */
-public class ScanRecordWrapper implements InternalRow {
-
- private final ChangeType changeType;
- private final com.alibaba.fluss.row.InternalRow flussRow;
- private final RowType rowType;
-
- public ScanRecordWrapper(ScanRecord scanRecord, RowType rowType) {
- this.changeType = scanRecord.getChangeType();
- this.flussRow = scanRecord.getRow();
- this.rowType = rowType;
- }
-
- @Override
- public int getFieldCount() {
- return flussRow.getFieldCount();
- }
-
- @Override
- public RowKind getRowKind() {
- switch (changeType) {
- case INSERT:
- return RowKind.INSERT;
- case UPDATE_BEFORE:
- return RowKind.UPDATE_BEFORE;
- case UPDATE_AFTER:
- return RowKind.UPDATE_AFTER;
- case DELETE:
- return RowKind.DELETE;
- default:
- throw new UnsupportedOperationException();
- }
- }
-
- @Override
- public void setRowKind(RowKind rowKind) {
- // do nothing
- }
-
- @Override
- public boolean isNullAt(int pos) {
- return flussRow.isNullAt(pos);
- }
-
- @Override
- public boolean getBoolean(int pos) {
- return flussRow.getBoolean(pos);
- }
-
- @Override
- public byte getByte(int pos) {
- return flussRow.getByte(pos);
- }
-
- @Override
- public short getShort(int pos) {
- return flussRow.getShort(pos);
- }
-
- @Override
- public int getInt(int pos) {
- return flussRow.getInt(pos);
- }
-
- @Override
- public long getLong(int pos) {
- return flussRow.getLong(pos);
- }
-
- @Override
- public float getFloat(int pos) {
- return flussRow.getFloat(pos);
- }
-
- @Override
- public double getDouble(int pos) {
- return flussRow.getDouble(pos);
- }
-
- @Override
- public BinaryString getString(int pos) {
- return BinaryString.fromString(flussRow.getString(pos).toString());
- }
-
- @Override
- public Decimal getDecimal(int pos, int precision, int scale) {
- return Decimal.fromBigDecimal(
- flussRow.getDecimal(pos, precision, scale).toBigDecimal(),
precision, scale);
- }
-
- @Override
- public Timestamp getTimestamp(int pos, int precision) {
- DataType paimonTimestampType = rowType.getTypeAt(pos);
- switch (paimonTimestampType.getTypeRoot()) {
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- if (TimestampNtz.isCompact(precision)) {
- return Timestamp.fromEpochMillis(
- flussRow.getTimestampNtz(pos,
precision).getMillisecond());
- } else {
- TimestampNtz timestampNtz = flussRow.getTimestampNtz(pos,
precision);
- return Timestamp.fromEpochMillis(
- timestampNtz.getMillisecond(),
timestampNtz.getNanoOfMillisecond());
- }
-
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- if (TimestampLtz.isCompact(precision)) {
- return Timestamp.fromEpochMillis(
- flussRow.getTimestampLtz(pos,
precision).getEpochMillisecond());
- } else {
- TimestampLtz timestampLtz = flussRow.getTimestampLtz(pos,
precision);
- return Timestamp.fromEpochMillis(
- timestampLtz.getEpochMillisecond(),
- timestampLtz.getNanoOfMillisecond());
- }
- default:
- throw new UnsupportedOperationException(
- "Unsupported data type to get timestamp: " +
paimonTimestampType);
- }
- }
-
- @Override
- public byte[] getBinary(int pos) {
- return flussRow.getBytes(pos);
- }
-
- @Override
- public Variant getVariant(int pos) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public InternalArray getArray(int pos) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public InternalMap getMap(int pos) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public InternalRow getRow(int pos, int pos1) {
- throw new UnsupportedOperationException();
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/SortMergeReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/SortMergeReader.java
deleted file mode 100644
index ebe8f0977..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/SortMergeReader.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.reader;
-
-import com.alibaba.fluss.utils.CloseableIterator;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.utils.ProjectedRow;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-
-/** A sort merge reader to merge paimon snapshot and fluss change log. */
-public class SortMergeReader {
-
- // to project to pk row
- private final ProjectedRow snapshotProjectedPkRow;
- private final RecordReader<InternalRow> paimonReader;
- private final Comparator<InternalRow> userKeyComparator;
-
- private final SnapshotMergedRowIteratorWrapper
snapshotMergedRowIteratorWrapper;
- private final ChangeLogIteratorWrapper changeLogIteratorWrapper;
- private @Nullable final ProjectedRow projectedRow;
-
- private CloseableIterator<KeyValueRow> changeLogIterator;
-
- SortMergeReader(
- // origin projected fields
- @Nullable int[] projectedFields,
- // the pk index in paimon row
- int[] pkIndexes,
- RecordReader<InternalRow> paimonReader,
- CloseableIterator<KeyValueRow> changeLogIterator,
- Comparator<InternalRow> userKeyComparator) {
- this.paimonReader = paimonReader;
- this.changeLogIterator = changeLogIterator;
- this.userKeyComparator = userKeyComparator;
- this.snapshotProjectedPkRow = ProjectedRow.from(pkIndexes);
-
- this.snapshotMergedRowIteratorWrapper = new
SnapshotMergedRowIteratorWrapper();
- this.changeLogIteratorWrapper = new ChangeLogIteratorWrapper();
-
- // to project to fields provided by user
- this.projectedRow = projectedFields == null ? null :
ProjectedRow.from(projectedFields);
- }
-
- @Nullable
- public
com.alibaba.fluss.utils.CloseableIterator<com.alibaba.fluss.row.InternalRow>
readBatch()
- throws IOException {
- RecordReader.RecordIterator<InternalRow> nextBatch =
paimonReader.readBatch();
- // no any snapshot record, now, read log
- if (nextBatch == null) {
- return changeLogIterator.hasNext()
- // wrap to scan record iterator
- ? changeLogIteratorWrapper.replace(changeLogIterator)
- : null;
- } else {
- RecordReader.RecordIterator<SortMergeRows> mergedRecordIterator =
- nextBatch.transform(this::sortMergeWithChangeLog);
- // wrap to snapshot merged row
- return
snapshotMergedRowIteratorWrapper.replace(mergedRecordIterator);
- }
- }
-
- /**
- * The IteratorWrapper to wrap Paimon's RecordReader.RecordIterator which
emit the merged rows
- * with paimon snapshot and fluss change log.
- */
- private class SnapshotMergedRowIteratorWrapper
- implements CloseableIterator<com.alibaba.fluss.row.InternalRow> {
- private RecordReader.RecordIterator<SortMergeRows> currentBatch;
-
- // the merged row after advance currentBatch once
- private @Nullable Iterator<InternalRow> currentMergedRows;
-
- // the row to be returned
- private @Nullable InternalRow returnedRow;
-
- public SnapshotMergedRowIteratorWrapper replace(
- RecordReader.RecordIterator<SortMergeRows> currentBatch) {
- this.currentBatch = currentBatch;
- this.returnedRow = null;
- this.currentMergedRows = null;
- return this;
- }
-
- @Override
- public void close() {
- currentBatch.releaseBatch();
- }
-
- @Override
- public boolean hasNext() {
- if (returnedRow != null) {
- return true;
- }
- try {
- // if currentMergedRows is null, we need to get the next
mergedRows
- if (currentMergedRows == null) {
- SortMergeRows sortMergeRows = currentBatch.next();
- // next mergedRows is not null and is not empty, set the
currentMergedRows
- if (sortMergeRows != null &&
!sortMergeRows.mergedRows.isEmpty()) {
- currentMergedRows =
sortMergeRows.mergedRows.iterator();
- }
- }
- // check whether has next row, if does, set the internalRow to
returned in method
- // next;
- if (currentMergedRows != null && currentMergedRows.hasNext()) {
- returnedRow = currentMergedRows.next();
- }
- return returnedRow != null;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public com.alibaba.fluss.row.InternalRow next() {
- InternalRow returnedRow =
- projectedRow == null
- ? this.returnedRow
- : projectedRow.replaceRow(this.returnedRow);
- // now, we can set the internalRow to null,
- // if no any row remain in current merged row, set the
currentMergedRows to null
- // to enable fetch next merged rows
- this.returnedRow = null;
- if (currentMergedRows != null && !currentMergedRows.hasNext()) {
- currentMergedRows = null;
- }
- return new PaimonRowWrapper(returnedRow);
- }
- }
-
- private class ChangeLogIteratorWrapper
- implements CloseableIterator<com.alibaba.fluss.row.InternalRow> {
- private CloseableIterator<KeyValueRow> changeLogRecordIterator;
-
- public ChangeLogIteratorWrapper() {}
-
- public ChangeLogIteratorWrapper replace(
- CloseableIterator<KeyValueRow> changeLogRecordIterator) {
- this.changeLogRecordIterator = changeLogRecordIterator;
- return this;
- }
-
- @Override
- public void close() {
- if (changeLogRecordIterator != null) {
- changeLogRecordIterator.close();
- }
- }
-
- @Override
- public boolean hasNext() {
- return changeLogRecordIterator != null &&
changeLogRecordIterator.hasNext();
- }
-
- @Override
- public com.alibaba.fluss.row.InternalRow next() {
- InternalRow returnedRow =
changeLogRecordIterator.next().valueRow();
- if (projectedRow != null) {
- returnedRow = projectedRow.replaceRow(returnedRow);
- }
- return new PaimonRowWrapper(returnedRow);
- }
- }
-
- private SortMergeRows sortMergeWithChangeLog(InternalRow
paimonSnapshotRow) {
- // no log record, we return the snapshot record
- if (!changeLogIterator.hasNext()) {
- return new SortMergeRows(paimonSnapshotRow);
- }
- KeyValueRow logKeyValueRow = changeLogIterator.next();
- // now, let's compare with the snapshot row with log row
- int compareResult =
- userKeyComparator.compare(
- snapshotProjectedPkRow.replaceRow(paimonSnapshotRow),
- logKeyValueRow.keyRow());
- if (compareResult == 0) {
- // record of snapshot is equal to log, but the log record is
delete,
- // we shouldn't emit record
- if (logKeyValueRow.isDelete()) {
- return SortMergeRows.EMPTY;
- } else {
- // return the log record
- return new SortMergeRows(logKeyValueRow.valueRow());
- }
- }
- // the snapshot record is less than the log record, emit the
- // snapshot record
- if (compareResult < 0) {
- // need to put back the log record to log iterator to make the log
record
- // can be advanced again
- changeLogIterator = addElementToHead(logKeyValueRow,
changeLogIterator);
- return new SortMergeRows(paimonSnapshotRow);
- } else {
- // snapshot record > log record
- // we should emit the log record firsts; and still need to
iterator changelog to find
- // the first change log greater than the snapshot record
- List<InternalRow> emitRows = new ArrayList<>();
- emitRows.add(logKeyValueRow.valueRow());
- boolean shouldEmitSnapshotRecord = true;
- while (changeLogIterator.hasNext()) {
- // get the next log record
- logKeyValueRow = changeLogIterator.next();
- // compare with the snapshot row,
- compareResult =
- userKeyComparator.compare(
-
snapshotProjectedPkRow.replaceRow(paimonSnapshotRow),
- logKeyValueRow.keyRow());
- // if snapshot record < the log record
- if (compareResult < 0) {
- // we can break the loop
- changeLogIterator = addElementToHead(logKeyValueRow,
changeLogIterator);
- break;
- } else if (compareResult > 0) {
- // snapshot record > the log record
- // the log record should be emitted
- emitRows.add(logKeyValueRow.valueRow());
- } else {
- // log record == snapshot record
- // the log record should be emitted if is not delete, but
the snapshot record
- // shouldn't be emitted
- if (!logKeyValueRow.isDelete()) {
- emitRows.add(logKeyValueRow.valueRow());
- }
- shouldEmitSnapshotRecord = false;
- }
- }
- if (shouldEmitSnapshotRecord) {
- emitRows.add(paimonSnapshotRow);
- }
- return new SortMergeRows(emitRows);
- }
- }
-
- private static class SortMergeRows {
- private static final SortMergeRows EMPTY = new
SortMergeRows(Collections.emptyList());
-
- // the rows merge with change log, one snapshot row may advance
multiple change log
- private final List<InternalRow> mergedRows;
-
- public SortMergeRows(List<InternalRow> mergedRows) {
- this.mergedRows = mergedRows;
- }
-
- public SortMergeRows(InternalRow internalRow) {
- this.mergedRows = Collections.singletonList(internalRow);
- }
- }
-
- private <T> CloseableIterator<T> addElementToHead(
- T firstElement, CloseableIterator<T> originElementIterator) {
- if (originElementIterator instanceof SingleElementHeadIterator) {
- SingleElementHeadIterator<T> singleElementHeadIterator =
- (SingleElementHeadIterator<T>) originElementIterator;
- singleElementHeadIterator.set(firstElement,
singleElementHeadIterator.inner);
- return singleElementHeadIterator;
- } else {
- return new SingleElementHeadIterator<>(firstElement,
originElementIterator);
- }
- }
-
- private static class SingleElementHeadIterator<T> implements
CloseableIterator<T> {
- private T singleElement;
- private CloseableIterator<T> inner;
- private boolean singleElementReturned;
-
- public SingleElementHeadIterator(T element, CloseableIterator<T>
inner) {
- this.singleElement = element;
- this.inner = inner;
- this.singleElementReturned = false;
- }
-
- public void set(T element, CloseableIterator<T> inner) {
- this.singleElement = element;
- this.inner = inner;
- this.singleElementReturned = false;
- }
-
- @Override
- public boolean hasNext() {
- return !singleElementReturned || inner.hasNext();
- }
-
- @Override
- public T next() {
- if (singleElementReturned) {
- return inner.next();
- }
- singleElementReturned = true;
- return singleElement;
- }
-
- @Override
- public void close() {
- inner.close();
- }
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplit.java
deleted file mode 100644
index 4fb113403..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplit.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.split;
-
-import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.metadata.TableBucket;
-
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-
-import javax.annotation.Nullable;
-
-import java.util.Optional;
-
-/** A split mixing Paimon snapshot and Fluss log. */
-public class PaimonSnapshotAndFlussLogSplit extends SourceSplitBase {
-
- public static final byte PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND = -2;
-
- // may be null when no snapshot data for the bucket
- @Nullable private final FileStoreSourceSplit snapshotSplit;
-
- /** The records to skip when reading the splits. */
- private final long recordsToSkip;
-
- private final long startingOffset;
- private final long stoppingOffset;
-
- public PaimonSnapshotAndFlussLogSplit(
- TableBucket tableBucket,
- @Nullable FileStoreSourceSplit snapshotSplit,
- long startingOffset,
- long stoppingOffset) {
- this(tableBucket, null, snapshotSplit, startingOffset, stoppingOffset,
0);
- }
-
- public PaimonSnapshotAndFlussLogSplit(
- TableBucket tableBucket,
- @Nullable String partitionName,
- @Nullable FileStoreSourceSplit snapshotSplit,
- long startingOffset,
- long stoppingOffset) {
- this(tableBucket, partitionName, snapshotSplit, startingOffset,
stoppingOffset, 0);
- }
-
- public PaimonSnapshotAndFlussLogSplit(
- TableBucket tableBucket,
- @Nullable String partitionName,
- @Nullable FileStoreSourceSplit snapshotSplit,
- long startingOffset,
- long stoppingOffset,
- long recordsToSkip) {
- super(tableBucket, partitionName);
- this.snapshotSplit = snapshotSplit;
- this.startingOffset = startingOffset;
- this.stoppingOffset = stoppingOffset;
- this.recordsToSkip = recordsToSkip;
- }
-
- public PaimonSnapshotAndFlussLogSplit updateWithRecordsToSkip(long
recordsToSkip) {
- return new PaimonSnapshotAndFlussLogSplit(
- getTableBucket(),
- getPartitionName(),
- snapshotSplit,
- startingOffset,
- stoppingOffset,
- recordsToSkip);
- }
-
- @Override
- public boolean isLakeSplit() {
- return true;
- }
-
- @Override
- public byte splitKind() {
- return PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
- }
-
- @Override
- public String splitId() {
- return toSplitId("paimon-hybrid-snapshot-log-", tableBucket);
- }
-
- @Nullable
- public FileStoreSourceSplit getSnapshotSplit() {
- return snapshotSplit;
- }
-
- public long getRecordsToSkip() {
- return recordsToSkip;
- }
-
- public long getStartingOffset() {
- return startingOffset;
- }
-
- public Optional<Long> getStoppingOffset() {
- return stoppingOffset >= 0 ? Optional.of(stoppingOffset) :
Optional.empty();
- }
-
- @Override
- public String toString() {
- return "PaimonSnapshotAndFlussLogSplit{"
- + "tableBucket="
- + tableBucket
- + ", partitionName='"
- + partitionName
- + '\''
- + ", snapshotSplit="
- + snapshotSplit
- + ", recordsToSkip="
- + recordsToSkip
- + ", startingOffset="
- + startingOffset
- + ", stoppingOffset="
- + stoppingOffset
- + '}';
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplitState.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplitState.java
deleted file mode 100644
index 0c9ae5b87..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotAndFlussLogSplitState.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.split;
-
-import com.alibaba.fluss.flink.source.split.SourceSplitState;
-
-/** State of {@link PaimonSnapshotAndFlussLogSplit}. */
-public class PaimonSnapshotAndFlussLogSplitState extends SourceSplitState {
-
- private final PaimonSnapshotAndFlussLogSplit
paimonSnapshotAndFlussLogSplit;
-
- /** The records to skip while reading a snapshot. */
- private long recordsToSkip;
-
- public PaimonSnapshotAndFlussLogSplitState(
- PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit) {
- super(paimonSnapshotAndFlussLogSplit);
- this.paimonSnapshotAndFlussLogSplit = paimonSnapshotAndFlussLogSplit;
- this.recordsToSkip = paimonSnapshotAndFlussLogSplit.getRecordsToSkip();
- }
-
- public void setRecordsToSkip(long recordsToSkip) {
- this.recordsToSkip = recordsToSkip;
- }
-
- @Override
- public PaimonSnapshotAndFlussLogSplit toSourceSplit() {
- return
paimonSnapshotAndFlussLogSplit.updateWithRecordsToSkip(recordsToSkip);
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplit.java
deleted file mode 100644
index 23f22f416..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplit.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.split;
-
-import com.alibaba.fluss.flink.source.split.SourceSplitBase;
-import com.alibaba.fluss.metadata.TableBucket;
-
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-
-import javax.annotation.Nullable;
-
-import java.util.Objects;
-
-/** A split for reading a snapshot of paimon. */
-public class PaimonSnapshotSplit extends SourceSplitBase {
-
- public static final byte PAIMON_SNAPSHOT_SPLIT_KIND = -1;
-
- private final FileStoreSourceSplit fileStoreSourceSplit;
-
- public PaimonSnapshotSplit(
- TableBucket tableBucket,
- @Nullable String partitionName,
- FileStoreSourceSplit fileStoreSourceSplit) {
- super(tableBucket, partitionName);
- this.fileStoreSourceSplit = fileStoreSourceSplit;
- }
-
- @Override
- public String splitId() {
- return toSplitId("paimon-snapshot-" + fileStoreSourceSplit.splitId() +
"-", tableBucket);
- }
-
- @Override
- public byte splitKind() {
- return PAIMON_SNAPSHOT_SPLIT_KIND;
- }
-
- @Override
- public boolean isLakeSplit() {
- return true;
- }
-
- public FileStoreSourceSplit getFileStoreSourceSplit() {
- return fileStoreSourceSplit;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof PaimonSnapshotSplit)) {
- return false;
- }
- if (!super.equals(o)) {
- return false;
- }
- PaimonSnapshotSplit that = (PaimonSnapshotSplit) o;
- return Objects.equals(fileStoreSourceSplit, that.fileStoreSourceSplit);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(super.hashCode(), fileStoreSourceSplit);
- }
-
- @Override
- public String toString() {
- return "PaimonSnapshotSplit{"
- + "tableBucket="
- + tableBucket
- + ", partitionName='"
- + partitionName
- + '\''
- + ", fileStoreSourceSplit="
- + fileStoreSourceSplit
- + '}';
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java
deleted file mode 100644
index 4c2941c4b..000000000
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/split/PaimonSnapshotSplitState.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.alibaba.fluss.flink.lakehouse.paimon.split;
-
-import com.alibaba.fluss.flink.source.split.SourceSplitState;
-
-import org.apache.paimon.flink.source.FileStoreSourceSplit;
-
-/** The state of {@link PaimonSnapshotSplit}. */
-public class PaimonSnapshotSplitState extends SourceSplitState {
-
- private final PaimonSnapshotSplit paimonSnapshotSplit;
- /** The records to skip while reading a snapshot. */
- private long recordsToSkip;
-
- public PaimonSnapshotSplitState(PaimonSnapshotSplit paimonSnapshotSplit) {
- super(paimonSnapshotSplit);
- this.paimonSnapshotSplit = paimonSnapshotSplit;
- this.recordsToSkip =
paimonSnapshotSplit.getFileStoreSourceSplit().recordsToSkip();
- }
-
- public void setRecordsToSkip(long recordsToSkip) {
- this.recordsToSkip = recordsToSkip;
- }
-
- @Override
- public PaimonSnapshotSplit toSourceSplit() {
- FileStoreSourceSplit fileStoreSourceSplit =
- paimonSnapshotSplit
- .getFileStoreSourceSplit()
- .updateWithRecordsToSkip(recordsToSkip);
- return new PaimonSnapshotSplit(
- paimonSnapshotSplit.getTableBucket(),
- paimonSnapshotSplit.getPartitionName(),
- fileStoreSourceSplit);
- }
-}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
index ac18b5283..9dfe4a028 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -81,7 +81,6 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
private static final Logger LOG =
LoggerFactory.getLogger(FlinkSourceSplitReader.class);
private final RowType sourceOutputType;
- private final TablePath tablePath;
// boundedSplits, kv snapshot split or lake snapshot split
private final Queue<SourceSplitBase> boundedSplits;
@@ -127,7 +126,6 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
this.connection = ConnectionFactory.createConnection(flussConf,
flinkMetricRegistry);
this.table = connection.getTable(tablePath);
this.sourceOutputType = sourceOutputType;
- this.tablePath = tablePath;
this.boundedSplits = new ArrayDeque<>();
this.subscribedBuckets = new HashMap<>();
this.projectedFields = projectedFields;
@@ -222,8 +220,7 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
private LakeSplitReaderGenerator getLakeSplitReader() {
if (lakeSplitReaderGenerator == null) {
lakeSplitReaderGenerator =
- new LakeSplitReaderGenerator(
- table, tablePath, projectedFields,
checkNotNull(lakeSource));
+ new LakeSplitReaderGenerator(table, projectedFields,
checkNotNull(lakeSource));
}
return lakeSplitReaderGenerator;
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
index b190852fb..2796f16e8 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -28,7 +28,6 @@ import com.alibaba.fluss.metadata.TableBucket;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.junit.jupiter.api.Test;
-import org.mockito.Mock;
import java.io.IOException;
import java.util.Collections;
@@ -55,7 +54,7 @@ class LakeSplitSerializerTest {
private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer =
new TestSimpleVersionedSerializer();
- @Mock private TableBucket tableBucket = new TableBucket(0, 1L, 0);
+ private TableBucket tableBucket = new TableBucket(0, 1L, 0);
private final LakeSplitSerializer serializer = new
LakeSplitSerializer(sourceSplitSerializer);
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index aa565fbaa..0b1e3ca0d 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -318,7 +318,6 @@
<!-- start exclude for metric -->
<exclude>com.alibaba.fluss.metrics.*</exclude>
<!-- end exclude for metric -->
-
<exclude>com.alibaba.fluss.flink.lakehouse.*</exclude>
<exclude>com.alibaba.fluss.flink.lake.*</exclude>
<exclude>com.alibaba.fluss.kafka.*</exclude>
<!-- exclude for fluss-ci-tools -->