This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 4cf486d68 [flink] Union read decouple with paimon for log table (#1527)
4cf486d68 is described below
commit 4cf486d6808e0babadade19f40812c2bf15d54a1
Author: CaoZhen <[email protected]>
AuthorDate: Wed Aug 13 04:50:20 2025 -0700
[flink] Union read decouple with paimon for log table (#1527)
---
.../alibaba/fluss/flink/catalog/FlinkCatalog.java | 2 +-
.../fluss/flink/catalog/FlinkTableFactory.java | 5 +-
.../flink/{lakehouse => lake}/LakeCatalog.java | 2 +-
.../LakeRecordRecordEmitter.java | 6 +-
.../fluss/flink/lake/LakeSplitGenerator.java | 391 +++++++++++++++++++++
.../LakeSplitReaderGenerator.java | 25 +-
.../fluss/flink/lake/LakeSplitSerializer.java | 126 +++++++
.../LakeSplitStateInitializer.java | 13 +-
.../{lakehouse => lake}/LakeTableFactory.java | 2 +-
.../flink/lake/reader/LakeSnapshotScanner.java | 97 +++++
.../fluss/flink/lake/split/LakeSnapshotSplit.java | 77 ++++
.../flink/lake/state/LakeSnapshotSplitState.java | 48 +++
.../alibaba/fluss/flink/source/FlinkSource.java | 44 ++-
.../fluss/flink/source/FlinkTableSource.java | 28 +-
.../flink/source/emitter/FlinkRecordEmitter.java | 2 +-
.../source/enumerator/FlinkSourceEnumerator.java | 41 ++-
.../flink/source/reader/FlinkSourceReader.java | 10 +-
.../source/reader/FlinkSourceSplitReader.java | 13 +-
.../flink/source/split/SourceSplitSerializer.java | 31 +-
.../fluss/flink/utils/FlinkConversions.java | 16 +
.../alibaba/fluss/flink/utils/LakeSourceUtils.java | 52 +++
.../enumerator/FlinkSourceEnumeratorTest.java | 3 +-
.../flink/source/reader/FlinkSourceReaderTest.java | 3 +-
.../source/reader/FlinkSourceSplitReaderTest.java | 8 +-
.../source/split/SourceSplitSerializerTest.java | 2 +-
fluss-test-coverage/pom.xml | 1 +
26 files changed, 993 insertions(+), 55 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
index 6dca5dca5..476b1aabf 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkCatalog.java
@@ -24,7 +24,7 @@ import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.exception.InvalidTableException;
-import com.alibaba.fluss.flink.lakehouse.LakeCatalog;
+import com.alibaba.fluss.flink.lake.LakeCatalog;
import com.alibaba.fluss.flink.procedure.ProcedureManager;
import com.alibaba.fluss.flink.utils.CatalogExceptionUtils;
import com.alibaba.fluss.flink.utils.DataLakeUtils;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java
index e9b5cc313..c943aa5f3 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java
@@ -20,7 +20,7 @@ package com.alibaba.fluss.flink.catalog;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.flink.FlinkConnectorOptions;
-import com.alibaba.fluss.flink.lakehouse.LakeTableFactory;
+import com.alibaba.fluss.flink.lake.LakeTableFactory;
import com.alibaba.fluss.flink.sink.FlinkTableSink;
import com.alibaba.fluss.flink.source.FlinkTableSource;
import com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils;
@@ -143,7 +143,8 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
cache,
partitionDiscoveryIntervalMs,
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_ENABLED)),
-
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)));
+
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
+ context.getCatalogTable().getOptions());
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java
similarity index 98%
rename from
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java
rename to
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java
index 3f42b8ff7..8c24e6e3c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeCatalog.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
similarity index 87%
rename from
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java
rename to
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
index 5133ddec4..207422ed8 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeRecordRecordEmitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
import com.alibaba.fluss.client.table.scanner.ScanRecord;
+import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
import com.alibaba.fluss.flink.source.reader.RecordAndPos;
@@ -48,6 +49,9 @@ public class LakeRecordRecordEmitter<OUT> {
((PaimonSnapshotAndFlussLogSplitState) splitState)
.setRecordsToSkip(recordAndPos.readRecordsCount());
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
+ } else if (splitState instanceof LakeSnapshotSplitState) {
+ ((LakeSnapshotSplitState)
splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
+ sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
} else {
throw new UnsupportedOperationException(
"Unknown split state type: " + splitState.getClass());
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
new file mode 100644
index 000000000..8c148ccf5
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.metadata.LakeSnapshot;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
+import
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
+import com.alibaba.fluss.flink.source.split.LogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableInfo;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitGenerator;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.InnerTableScan;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static
com.alibaba.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static
com.alibaba.fluss.flink.utils.DataLakeUtils.extractLakeCatalogProperties;
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** A generator for lake splits. */
+public class LakeSplitGenerator {
+
+ private final TableInfo tableInfo;
+ private final Admin flussAdmin;
+ private final OffsetsInitializer.BucketOffsetsRetriever
bucketOffsetsRetriever;
+ private final OffsetsInitializer stoppingOffsetInitializer;
+ private final int bucketCount;
+
+ private final LakeSource<LakeSplit> lakeSource;
+
+ public LakeSplitGenerator(
+ TableInfo tableInfo,
+ Admin flussAdmin,
+ LakeSource<LakeSplit> lakeSource,
+ OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
+ OffsetsInitializer stoppingOffsetInitializer,
+ int bucketCount) {
+ this.tableInfo = tableInfo;
+ this.flussAdmin = flussAdmin;
+ this.lakeSource = lakeSource;
+ this.bucketOffsetsRetriever = bucketOffsetsRetriever;
+ this.stoppingOffsetInitializer = stoppingOffsetInitializer;
+ this.bucketCount = bucketCount;
+ }
+
+ public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
+ // get the file store
+ LakeSnapshot lakeSnapshotInfo =
+
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+ FileStoreTable fileStoreTable =
+ getTable(
+ lakeSnapshotInfo.getSnapshotId(),
+
extractLakeCatalogProperties(tableInfo.getProperties()));
+
+ boolean isLogTable = !tableInfo.hasPrimaryKey();
+ boolean isPartitioned = tableInfo.isPartitioned();
+
+ Map<String, Map<Integer, List<LakeSplit>>> lakeSplits =
+ groupLakeSplits(
+ lakeSource
+ .createPlanner(
+ (LakeSource.PlannerContext)
lakeSnapshotInfo::getSnapshotId)
+ .plan());
+ if (isPartitioned) {
+ List<PartitionInfo> partitionInfos =
+
flussAdmin.listPartitionInfos(tableInfo.getTablePath()).get();
+ Map<Long, String> partitionNameById =
+ partitionInfos.stream()
+ .collect(
+ Collectors.toMap(
+ PartitionInfo::getPartitionId,
+ PartitionInfo::getPartitionName));
+ return generatePartitionTableSplit(
+ lakeSplits,
+ isLogTable,
+ lakeSnapshotInfo.getTableBucketsOffset(),
+ partitionNameById,
+ fileStoreTable);
+ } else {
+ Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
+ lakeSplits.values().iterator().next();
+ // non-partitioned table
+ return generateNoPartitionedTableSplit(
+ nonPartitionLakeSplits,
+ isLogTable,
+ lakeSnapshotInfo.getTableBucketsOffset(),
+ fileStoreTable);
+ }
+ }
+
+ private Map<String, Map<Integer, List<LakeSplit>>>
groupLakeSplits(List<LakeSplit> lakeSplits) {
+ Map<String, Map<Integer, List<LakeSplit>>> result = new HashMap<>();
+ for (LakeSplit split : lakeSplits) {
+ String partition = String.join("$", split.partition());
+ int bucket = split.bucket();
+ // Get or create the partition group
+ Map<Integer, List<LakeSplit>> bucketMap =
+ result.computeIfAbsent(partition, k -> new HashMap<>());
+ List<LakeSplit> splitList = bucketMap.computeIfAbsent(bucket, k ->
new ArrayList<>());
+ splitList.add(split);
+ }
+ return result;
+ }
+
+ private List<SourceSplitBase> generatePartitionTableSplit(
+ Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
+ boolean isLogTable,
+ Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+ Map<Long, String> partitionNameById,
+ @Nullable FileStoreTable fileStoreTable)
+ throws Exception {
+ List<SourceSplitBase> splits = new ArrayList<>();
+ Map<String, Long> flussPartitionIdByName =
+ partitionNameById.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getValue,
+ Map.Entry::getKey,
+ (existing, replacement) -> existing,
+ LinkedHashMap::new));
+ long lakeSplitPartitionId = -1L;
+
+ // iterate lake splits
+ for (Map.Entry<String, Map<Integer, List<LakeSplit>>> lakeSplitEntry :
+ lakeSplits.entrySet()) {
+ String partitionName = lakeSplitEntry.getKey();
+ Map<Integer, List<LakeSplit>> lakeSplitsOfPartition =
lakeSplitEntry.getValue();
+ Long partitionId = flussPartitionIdByName.remove(partitionName);
+ if (partitionId != null) {
+ // mean the partition also exist in fluss partition
+ Map<Integer, Long> bucketEndOffset =
+ stoppingOffsetInitializer.getBucketOffsets(
+ partitionName,
+ IntStream.range(0, bucketCount)
+ .boxed()
+ .collect(Collectors.toList()),
+ bucketOffsetsRetriever);
+ splits.addAll(
+ generateSplit(
+ lakeSplitsOfPartition,
+ partitionId,
+ partitionName,
+ isLogTable,
+ tableBucketSnapshotLogOffset,
+ bucketEndOffset,
+ fileStoreTable));
+
+ } else {
+ // only lake data
+ splits.addAll(
+ toLakeSnapshotSplits(
+ lakeSplitsOfPartition,
+ partitionName,
+ // now, we can't get partition id for the
partition only
+ // in lake, set them to a arbitrary partition
id, but
+ // make sure different partition have
different partition id
+ // to enable different partition can be
distributed to different
+ // tasks
+ lakeSplitPartitionId--));
+ }
+ }
+
+ // iterate remain fluss splits
+ for (Map.Entry<String, Long> partitionIdByNameEntry :
flussPartitionIdByName.entrySet()) {
+ String partitionName = partitionIdByNameEntry.getKey();
+ long partitionId = partitionIdByNameEntry.getValue();
+ Map<Integer, Long> bucketEndOffset =
+ stoppingOffsetInitializer.getBucketOffsets(
+ partitionName,
+ IntStream.range(0,
bucketCount).boxed().collect(Collectors.toList()),
+ bucketOffsetsRetriever);
+ splits.addAll(
+ generateSplit(
+ null,
+ partitionId,
+ partitionName,
+ isLogTable,
+ // pass empty map since we won't read lake splits
+ Collections.emptyMap(),
+ bucketEndOffset,
+ fileStoreTable));
+ }
+ return splits;
+ }
+
+ private List<SourceSplitBase> generateSplit(
+ @Nullable Map<Integer, List<LakeSplit>> lakeSplits,
+ @Nullable Long partitionId,
+ @Nullable String partitionName,
+ boolean isLogTable,
+ Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+ Map<Integer, Long> bucketEndOffset,
+ @Nullable FileStoreTable fileStoreTable) {
+ List<SourceSplitBase> splits = new ArrayList<>();
+ if (isLogTable) {
+ if (lakeSplits != null) {
+ splits.addAll(toLakeSnapshotSplits(lakeSplits, partitionName,
partitionId));
+ }
+ for (int bucket = 0; bucket < bucketCount; bucket++) {
+ TableBucket tableBucket =
+ new TableBucket(tableInfo.getTableId(), partitionId,
bucket);
+ Long snapshotLogOffset =
tableBucketSnapshotLogOffset.get(tableBucket);
+ long stoppingOffset = bucketEndOffset.get(bucket);
+ if (snapshotLogOffset == null) {
+ // no any data commit to this bucket, scan from fluss log
+ splits.add(
+ new LogSplit(
+ tableBucket, partitionName,
EARLIEST_OFFSET, stoppingOffset));
+ } else {
+ // need to read remain fluss log
+ if (snapshotLogOffset < stoppingOffset) {
+ splits.add(
+ new LogSplit(
+ tableBucket,
+ partitionName,
+ snapshotLogOffset,
+ stoppingOffset));
+ }
+ }
+ }
+ } else {
+ // it's primary key table
+ for (int bucket = 0; bucket < bucketCount; bucket++) {
+ TableBucket tableBucket =
+ new TableBucket(tableInfo.getTableId(), partitionId,
bucket);
+ Long snapshotLogOffset =
tableBucketSnapshotLogOffset.get(tableBucket);
+ long stoppingOffset = bucketEndOffset.get(bucket);
+ FileStoreSourceSplitGenerator splitGenerator = new
FileStoreSourceSplitGenerator();
+
+ splits.add(
+ generateSplitForPrimaryKeyTableBucket(
+ fileStoreTable,
+ splitGenerator,
+ tableBucket,
+ partitionName,
+ snapshotLogOffset,
+ stoppingOffset));
+ }
+ }
+
+ return splits;
+ }
+
+ private List<SourceSplitBase> toLakeSnapshotSplits(
+ Map<Integer, List<LakeSplit>> lakeSplits,
+ @Nullable String partitionName,
+ @Nullable Long partitionId) {
+ List<SourceSplitBase> splits = new ArrayList<>();
+ for (LakeSplit lakeSplit :
+
lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList()))
{
+ TableBucket tableBucket =
+ new TableBucket(tableInfo.getTableId(), partitionId,
lakeSplit.bucket());
+ splits.add(new LakeSnapshotSplit(tableBucket, partitionName,
lakeSplit));
+ }
+ return splits;
+ }
+
+ private SourceSplitBase generateSplitForPrimaryKeyTableBucket(
+ FileStoreTable fileStoreTable,
+ FileStoreSourceSplitGenerator splitGenerator,
+ TableBucket tableBucket,
+ @Nullable String partitionName,
+ @Nullable Long snapshotLogOffset,
+ long stoppingOffset) {
+
+ // no snapshot data for this bucket or no a corresponding log offset
in this bucket,
+ // can only scan from change log
+ if (snapshotLogOffset == null || snapshotLogOffset < 0) {
+ return new PaimonSnapshotAndFlussLogSplit(
+ tableBucket, partitionName, null, EARLIEST_OFFSET,
stoppingOffset);
+ }
+
+ // then, generate a split contains
+ // snapshot and change log so that we can merge change log and snapshot
+ // to get the full data
+ fileStoreTable =
+ fileStoreTable.copy(
+ Collections.singletonMap(
+ CoreOptions.SOURCE_SPLIT_TARGET_SIZE.key(),
+ // we set a max size to make sure only one
splits
+ MemorySize.MAX_VALUE.toString()));
+ InnerTableScan tableScan =
+ fileStoreTable.newScan().withBucketFilter((b) -> b ==
tableBucket.getBucket());
+
+ if (partitionName != null) {
+ tableScan =
+
tableScan.withPartitionFilter(getPartitionSpec(fileStoreTable, partitionName));
+ }
+
+ List<FileStoreSourceSplit> fileStoreSourceSplits =
+ splitGenerator.createSplits(tableScan.plan());
+
+ checkState(fileStoreSourceSplits.size() == 1, "Splits for primary key
table must be 1.");
+ FileStoreSourceSplit fileStoreSourceSplit =
fileStoreSourceSplits.get(0);
+ return new PaimonSnapshotAndFlussLogSplit(
+ tableBucket,
+ partitionName,
+ fileStoreSourceSplit,
+ snapshotLogOffset,
+ stoppingOffset);
+ }
+
+ private Map<String, String> getPartitionSpec(
+ FileStoreTable fileStoreTable, String partitionName) {
+ List<String> partitionKeys = fileStoreTable.partitionKeys();
+ checkState(
+ partitionKeys.size() == 1,
+ "Must only one partition key for paimon table %, but got %s,
the partition keys are: ",
+ tableInfo.getTablePath(),
+ partitionKeys.size(),
+ partitionKeys);
+ return Collections.singletonMap(partitionKeys.get(0), partitionName);
+ }
+
+ private FileStoreTable getTable(long snapshotId, Map<String, String>
catalogProperties)
+ throws Exception {
+ try (Catalog catalog =
+
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(catalogProperties))) {
+ return (FileStoreTable)
+ catalog.getTable(
+ Identifier.create(
+
tableInfo.getTablePath().getDatabaseName(),
+
tableInfo.getTablePath().getTableName()))
+ .copy(
+ Collections.singletonMap(
+ CoreOptions.SCAN_SNAPSHOT_ID.key(),
+ String.valueOf(snapshotId)));
+ }
+ }
+
+ private List<SourceSplitBase> generateNoPartitionedTableSplit(
+ Map<Integer, List<LakeSplit>> lakeSplits,
+ boolean isLogTable,
+ Map<TableBucket, Long> tableBucketSnapshotLogOffset,
+ FileStoreTable fileStoreTable) {
+ // iterate all bucket
+ // assume bucket is from 0 to bucket count
+ Map<Integer, Long> bucketEndOffset =
+ stoppingOffsetInitializer.getBucketOffsets(
+ null,
+ IntStream.range(0,
bucketCount).boxed().collect(Collectors.toList()),
+ bucketOffsetsRetriever);
+ return generateSplit(
+ lakeSplits,
+ null,
+ null,
+ isLogTable,
+ tableBucketSnapshotLogOffset,
+ bucketEndOffset,
+ fileStoreTable);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
similarity index 84%
rename from
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java
rename to
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
index ffe7b4bd6..02e5e5e5b 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitReaderGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -15,10 +15,11 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
-import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.table.Table;
+import com.alibaba.fluss.flink.lake.reader.LakeSnapshotScanner;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
import
com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotAndLogSplitScanner;
import com.alibaba.fluss.flink.lakehouse.paimon.reader.PaimonSnapshotScanner;
import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
@@ -26,6 +27,8 @@ import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
import com.alibaba.fluss.flink.source.reader.BoundedSplitReader;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.flink.utils.DataLakeUtils;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
import com.alibaba.fluss.metadata.TablePath;
import org.apache.flink.util.ExceptionUtils;
@@ -46,21 +49,21 @@ import java.util.stream.IntStream;
public class LakeSplitReaderGenerator {
private final Table table;
- private final Connection connection;
private final TablePath tablePath;
private FileStoreTable fileStoreTable;
private final @Nullable int[] projectedFields;
+ private final @Nullable LakeSource<LakeSplit> lakeSource;
public LakeSplitReaderGenerator(
Table table,
- Connection connection,
TablePath tablePath,
- @Nullable int[] projectedFields) {
+ @Nullable int[] projectedFields,
+ @Nullable LakeSource<LakeSplit> lakeSource) {
this.table = table;
- this.connection = connection;
this.tablePath = tablePath;
this.projectedFields = projectedFields;
+ this.lakeSource = lakeSource;
}
public void addSplit(SourceSplitBase split, Queue<SourceSplitBase>
boundedSplits) {
@@ -68,6 +71,9 @@ public class LakeSplitReaderGenerator {
boundedSplits.add(split);
} else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
boundedSplits.add(split);
+ } else if (split instanceof LakeSnapshotSplit) {
+ boundedSplits.add(split);
+ // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
} else {
throw new UnsupportedOperationException(
String.format("The split type of %s is not supported.",
split.getClass()));
@@ -100,6 +106,13 @@ public class LakeSplitReaderGenerator {
return new BoundedSplitReader(
paimonSnapshotAndLogSplitScanner,
paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
+ } else if (split instanceof LakeSnapshotSplit) {
+ LakeSnapshotSplit lakeSnapshotSplit = (LakeSnapshotSplit) split;
+ LakeSnapshotScanner lakeSnapshotScanner =
+ new LakeSnapshotScanner(lakeSource, lakeSnapshotSplit);
+ return new BoundedSplitReader(
+ lakeSnapshotScanner,
lakeSnapshotSplit.getRecordsToSplit());
+ // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
} else {
throw new UnsupportedOperationException(
String.format("The split type of %s is not supported.",
split.getClass()));
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
new file mode 100644
index 000000000..19b6f29c7
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitSerializer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
+import com.alibaba.fluss.flink.source.split.LogSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TableBucket;
+
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.paimon.flink.source.FileStoreSourceSplit;
+import org.apache.paimon.flink.source.FileStoreSourceSplitSerializer;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static
com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit.LAKE_SNAPSHOT_SPLIT_KIND;
+import static
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit.PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
+
+/** A serializer for lake split. */
+public class LakeSplitSerializer {
+
+ private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer;
+
+ public LakeSplitSerializer(SimpleVersionedSerializer<LakeSplit>
sourceSplitSerializer) {
+ this.sourceSplitSerializer = sourceSplitSerializer;
+ }
+
+ public void serialize(DataOutputSerializer out, SourceSplitBase split)
throws IOException {
+ if (split instanceof LakeSnapshotSplit) {
+ byte[] serializeBytes =
+ sourceSplitSerializer.serialize(((LakeSnapshotSplit)
split).getLakeSplit());
+ out.writeInt(serializeBytes.length);
+ out.write(serializeBytes);
+ } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
+ // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
+ FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
+ new FileStoreSourceSplitSerializer();
+ // writing file store source split
+ PaimonSnapshotAndFlussLogSplit paimonSnapshotAndFlussLogSplit =
+ ((PaimonSnapshotAndFlussLogSplit) split);
+ FileStoreSourceSplit fileStoreSourceSplit =
+ paimonSnapshotAndFlussLogSplit.getSnapshotSplit();
+ if (fileStoreSourceSplit == null) {
+ // no snapshot data for the bucket
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ byte[] serializeBytes =
+
fileStoreSourceSplitSerializer.serialize(fileStoreSourceSplit);
+ out.writeInt(serializeBytes.length);
+ out.write(serializeBytes);
+ }
+ // writing starting/stopping offset
+ out.writeLong(paimonSnapshotAndFlussLogSplit.getStartingOffset());
+ out.writeLong(
+ paimonSnapshotAndFlussLogSplit
+ .getStoppingOffset()
+ .orElse(LogSplit.NO_STOPPING_OFFSET));
+ out.writeLong(paimonSnapshotAndFlussLogSplit.getRecordsToSkip());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported split type: " + split.getClass().getName());
+ }
+ }
+
+ public SourceSplitBase deserialize(
+ byte splitKind,
+ TableBucket tableBucket,
+ @Nullable String partition,
+ DataInputDeserializer input)
+ throws IOException {
+ if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
+ byte[] serializeBytes = new byte[input.readInt()];
+ input.read(serializeBytes);
+ LakeSplit fileStoreSourceSplit =
+ sourceSplitSerializer.deserialize(
+ sourceSplitSerializer.getVersion(),
serializeBytes);
+ return new LakeSnapshotSplit(tableBucket, partition,
fileStoreSourceSplit);
+ // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
+ } else if (splitKind == PAIMON_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
+ FileStoreSourceSplitSerializer fileStoreSourceSplitSerializer =
+ new FileStoreSourceSplitSerializer();
+ FileStoreSourceSplit fileStoreSourceSplit = null;
+ if (input.readBoolean()) {
+ byte[] serializeBytes = new byte[input.readInt()];
+ input.read(serializeBytes);
+ fileStoreSourceSplit =
+ fileStoreSourceSplitSerializer.deserialize(
+ fileStoreSourceSplitSerializer.getVersion(),
serializeBytes);
+ }
+ long startingOffset = input.readLong();
+ long stoppingOffset = input.readLong();
+ long recordsToSkip = input.readLong();
+ return new PaimonSnapshotAndFlussLogSplit(
+ tableBucket,
+ partition,
+ fileStoreSourceSplit,
+ startingOffset,
+ stoppingOffset,
+ recordsToSkip);
+ } else {
+ throw new UnsupportedOperationException("Unsupported split kind: "
+ splitKind);
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
similarity index 76%
rename from
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java
rename to
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
index 1fbd20a25..aae8987ad 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeSplitStateInitializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitStateInitializer.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.lake.state.LakeSnapshotSplitState;
import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplit;
import
com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotAndFlussLogSplitState;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplit;
-import com.alibaba.fluss.flink.lakehouse.paimon.split.PaimonSnapshotSplitState;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.flink.source.split.SourceSplitState;
@@ -28,10 +28,11 @@ import
com.alibaba.fluss.flink.source.split.SourceSplitState;
public class LakeSplitStateInitializer {
public static SourceSplitState initializedState(SourceSplitBase split) {
- if (split instanceof PaimonSnapshotSplit) {
- return new PaimonSnapshotSplitState((PaimonSnapshotSplit) split);
- } else if (split instanceof PaimonSnapshotAndFlussLogSplit) {
+ if (split instanceof PaimonSnapshotAndFlussLogSplit) {
return new
PaimonSnapshotAndFlussLogSplitState((PaimonSnapshotAndFlussLogSplit) split);
+ } else if (split instanceof LakeSnapshotSplit) {
+ return new LakeSnapshotSplitState((LakeSnapshotSplit) split);
+ // TODO support primary key table in
https://github.com/apache/fluss/issues/1434
} else {
throw new UnsupportedOperationException("Unsupported split type: "
+ split);
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java
similarity index 98%
rename from
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java
rename to
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java
index f5ea9a3d3..5092a59e6 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/LakeTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeTableFactory.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package com.alibaba.fluss.flink.lakehouse;
+package com.alibaba.fluss.flink.lake;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java
new file mode 100644
index 000000000..8a51482ba
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/reader/LakeSnapshotScanner.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.reader;
+
+import com.alibaba.fluss.client.table.scanner.batch.BatchScanner;
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.record.LogRecord;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+
+/** A scanner for reading lake split {@link LakeSnapshotSplit}. */
+public class LakeSnapshotScanner implements BatchScanner {
+
+ private final LakeSource<LakeSplit> lakeSource;
+ private final LakeSnapshotSplit lakeSnapshotSplit;
+
+ private CloseableIterator<InternalRow> rowsIterator;
+
+ public LakeSnapshotScanner(
+ LakeSource<LakeSplit> lakeSource, LakeSnapshotSplit
lakeSnapshotSplit) {
+ this.lakeSource = lakeSource;
+ this.lakeSnapshotSplit = lakeSnapshotSplit;
+ }
+
+ @Nullable
+ @Override
+ public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws
IOException {
+ if (rowsIterator == null) {
+ rowsIterator =
+ InternalRowIterator.wrap(
+ lakeSource
+ .createRecordReader(
+
(LakeSource.ReaderContext<LakeSplit>)
+
lakeSnapshotSplit::getLakeSplit)
+ .read());
+ }
+ return rowsIterator.hasNext() ? rowsIterator : null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (rowsIterator != null) {
+ rowsIterator.close();
+ }
+ }
+
+ private static class InternalRowIterator implements
CloseableIterator<InternalRow> {
+
+ private final CloseableIterator<LogRecord> recordCloseableIterator;
+
+ private static InternalRowIterator wrap(
+ CloseableIterator<LogRecord> recordCloseableIterator) {
+ return new InternalRowIterator(recordCloseableIterator);
+ }
+
+ private InternalRowIterator(CloseableIterator<LogRecord>
recordCloseableIterator) {
+ this.recordCloseableIterator = recordCloseableIterator;
+ }
+
+ @Override
+ public void close() {
+ recordCloseableIterator.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return recordCloseableIterator.hasNext();
+ }
+
+ @Override
+ public InternalRow next() {
+ return recordCloseableIterator.next().getRow();
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java
new file mode 100644
index 000000000..b2460177a
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/split/LakeSnapshotSplit.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.split;
+
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TableBucket;
+
+import javax.annotation.Nullable;
+
+/** A split for reading a snapshot of lake. */
+public class LakeSnapshotSplit extends SourceSplitBase {
+
+ public static final byte LAKE_SNAPSHOT_SPLIT_KIND = -1;
+
+ private final LakeSplit lakeSplit;
+
+ private final long recordsToSplit;
+
+ public LakeSnapshotSplit(
+ TableBucket tableBucket, @Nullable String partitionName, LakeSplit
lakeSplit) {
+ this(tableBucket, partitionName, lakeSplit, 0);
+ }
+
+ public LakeSnapshotSplit(
+ TableBucket tableBucket,
+ @Nullable String partitionName,
+ LakeSplit lakeSplit,
+ long recordsToSplit) {
+ super(tableBucket, partitionName);
+ this.lakeSplit = lakeSplit;
+ this.recordsToSplit = recordsToSplit;
+ }
+
+ public LakeSplit getLakeSplit() {
+ return lakeSplit;
+ }
+
+ public long getRecordsToSplit() {
+ return recordsToSplit;
+ }
+
+ @Override
+ public String splitId() {
+ return toSplitId(
+ "lake-snapshot-",
+ new TableBucket(
+ tableBucket.getTableId(),
+ tableBucket.getPartitionId(),
+ lakeSplit.bucket()));
+ }
+
+ @Override
+ public boolean isLakeSplit() {
+ return true;
+ }
+
+ @Override
+ public byte splitKind() {
+ return LAKE_SNAPSHOT_SPLIT_KIND;
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java
new file mode 100644
index 000000000..ae6eb3fa4
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/state/LakeSnapshotSplitState.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.lake.state;
+
+import com.alibaba.fluss.flink.lake.split.LakeSnapshotSplit;
+import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.flink.source.split.SourceSplitState;
+
+/** The state of {@link LakeSnapshotSplit}. */
+public class LakeSnapshotSplitState extends SourceSplitState {
+
+ private final LakeSnapshotSplit split;
+ private long recordsToSplit;
+
+ public LakeSnapshotSplitState(LakeSnapshotSplit split) {
+ super(split);
+ this.split = split;
+ this.recordsToSplit = split.getRecordsToSplit();
+ }
+
+ public void setRecordsToSkip(long recordsToSkip) {
+ this.recordsToSplit = recordsToSkip;
+ }
+
+ @Override
+ public SourceSplitBase toSourceSplit() {
+ return new LakeSnapshotSplit(
+ split.getTableBucket(),
+ split.getPartitionName(),
+ split.getLakeSplit(),
+ recordsToSplit);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
index d806164e0..1b4f750b0 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
@@ -31,6 +31,8 @@ import
com.alibaba.fluss.flink.source.split.SourceSplitSerializer;
import
com.alibaba.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer;
import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.types.RowType;
@@ -70,6 +72,8 @@ public class FlinkSource<OUT>
private final List<FieldEqual> partitionFilters;
+ private final @Nullable LakeSource<LakeSplit> lakeSource;
+
public FlinkSource(
Configuration flussConf,
TablePath tablePath,
@@ -82,6 +86,34 @@ public class FlinkSource<OUT>
FlussDeserializationSchema<OUT> deserializationSchema,
boolean streaming,
List<FieldEqual> partitionFilters) {
+ this(
+ flussConf,
+ tablePath,
+ hasPrimaryKey,
+ isPartitioned,
+ sourceOutputType,
+ projectedFields,
+ offsetsInitializer,
+ scanPartitionDiscoveryIntervalMs,
+ deserializationSchema,
+ streaming,
+ partitionFilters,
+ null);
+ }
+
+ public FlinkSource(
+ Configuration flussConf,
+ TablePath tablePath,
+ boolean hasPrimaryKey,
+ boolean isPartitioned,
+ RowType sourceOutputType,
+ @Nullable int[] projectedFields,
+ OffsetsInitializer offsetsInitializer,
+ long scanPartitionDiscoveryIntervalMs,
+ FlussDeserializationSchema<OUT> deserializationSchema,
+ boolean streaming,
+ List<FieldEqual> partitionFilters,
+ LakeSource<LakeSplit> lakeSource) {
this.flussConf = flussConf;
this.tablePath = tablePath;
this.hasPrimaryKey = hasPrimaryKey;
@@ -93,6 +125,7 @@ public class FlinkSource<OUT>
this.deserializationSchema = deserializationSchema;
this.streaming = streaming;
this.partitionFilters = checkNotNull(partitionFilters);
+ this.lakeSource = lakeSource;
}
@Override
@@ -112,7 +145,8 @@ public class FlinkSource<OUT>
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
streaming,
- partitionFilters);
+ partitionFilters,
+ lakeSource);
}
@Override
@@ -130,12 +164,13 @@ public class FlinkSource<OUT>
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
streaming,
- partitionFilters);
+ partitionFilters,
+ lakeSource);
}
@Override
public SimpleVersionedSerializer<SourceSplitBase> getSplitSerializer() {
- return SourceSplitSerializer.INSTANCE;
+ return new SourceSplitSerializer(lakeSource);
}
@Override
@@ -166,7 +201,8 @@ public class FlinkSource<OUT>
context,
projectedFields,
flinkSourceReaderMetrics,
- recordEmitter);
+ recordEmitter,
+ lakeSource);
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
index 65e8ac813..deb986a50 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java
@@ -28,6 +28,8 @@ import
com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils;
import com.alibaba.fluss.flink.utils.FlinkConversions;
import com.alibaba.fluss.flink.utils.PushdownUtils;
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
import com.alibaba.fluss.metadata.MergeEngineType;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.types.RowType;
@@ -76,6 +78,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import static com.alibaba.fluss.flink.utils.LakeSourceUtils.createLakeSource;
import static
com.alibaba.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
import static com.alibaba.fluss.flink.utils.PushdownUtils.extractFieldEquals;
import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
@@ -130,6 +133,10 @@ public class FlinkTableSource
private List<FieldEqual> partitionFilters = Collections.emptyList();
+ private final Map<String, String> tableOptions;
+
+ @Nullable private LakeSource<LakeSplit> lakeSource;
+
public FlinkTableSource(
TablePath tablePath,
Configuration flussConfig,
@@ -144,7 +151,8 @@ public class FlinkTableSource
@Nullable LookupCache cache,
long scanPartitionDiscoveryIntervalMs,
boolean isDataLakeEnabled,
- @Nullable MergeEngineType mergeEngineType) {
+ @Nullable MergeEngineType mergeEngineType,
+ Map<String, String> tableOptions) {
this.tablePath = tablePath;
this.flussConfig = flussConfig;
this.tableOutputType = tableOutputType;
@@ -162,6 +170,10 @@ public class FlinkTableSource
this.scanPartitionDiscoveryIntervalMs =
scanPartitionDiscoveryIntervalMs;
this.isDataLakeEnabled = isDataLakeEnabled;
this.mergeEngineType = mergeEngineType;
+ this.tableOptions = tableOptions;
+ if (isDataLakeEnabled) {
+ this.lakeSource = createLakeSource(tablePath, tableOptions);
+ }
}
@Override
@@ -270,7 +282,8 @@ public class FlinkTableSource
scanPartitionDiscoveryIntervalMs,
new RowDataDeserializationSchema(),
streaming,
- partitionFilters);
+ partitionFilters,
+ lakeSource);
if (!streaming) {
// return a bounded source provide to make planner happy,
@@ -359,12 +372,14 @@ public class FlinkTableSource
cache,
scanPartitionDiscoveryIntervalMs,
isDataLakeEnabled,
- mergeEngineType);
+ mergeEngineType,
+ tableOptions);
source.producedDataType = producedDataType;
source.projectedFields = projectedFields;
source.singleRowFilter = singleRowFilter;
source.modificationScanType = modificationScanType;
source.partitionFilters = partitionFilters;
+ source.lakeSource = lakeSource;
return source;
}
@@ -382,10 +397,17 @@ public class FlinkTableSource
public void applyProjection(int[][] projectedFields, DataType
producedDataType) {
this.projectedFields = Arrays.stream(projectedFields).mapToInt(value
-> value[0]).toArray();
this.producedDataType = producedDataType.getLogicalType();
+ if (lakeSource != null) {
+ lakeSource.withProject(projectedFields);
+ }
}
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
+ if (lakeSource != null) {
+ // todo: use real filters
+ }
+
List<ResolvedExpression> acceptedFilters = new ArrayList<>();
List<ResolvedExpression> remainingFilters = new ArrayList<>();
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java
index 3e62fdd0e..84602ca35 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/emitter/FlinkRecordEmitter.java
@@ -18,7 +18,7 @@
package com.alibaba.fluss.flink.source.emitter;
import com.alibaba.fluss.client.table.scanner.ScanRecord;
-import com.alibaba.fluss.flink.lakehouse.LakeRecordRecordEmitter;
+import com.alibaba.fluss.flink.lake.LakeRecordRecordEmitter;
import com.alibaba.fluss.flink.source.deserializer.FlussDeserializationSchema;
import com.alibaba.fluss.flink.source.reader.FlinkSourceReader;
import com.alibaba.fluss.flink.source.reader.RecordAndPos;
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index e65e64464..05ac7fa6e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -23,7 +23,7 @@ import com.alibaba.fluss.client.admin.Admin;
import com.alibaba.fluss.client.metadata.KvSnapshots;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.flink.lakehouse.LakeSplitGenerator;
+import com.alibaba.fluss.flink.lake.LakeSplitGenerator;
import
com.alibaba.fluss.flink.source.enumerator.initializer.BucketOffsetsRetrieverImpl;
import
com.alibaba.fluss.flink.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import
com.alibaba.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -36,6 +36,8 @@ import com.alibaba.fluss.flink.source.split.LogSplit;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.flink.source.state.SourceEnumeratorState;
import com.alibaba.fluss.flink.utils.PushdownUtils.FieldEqual;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TableInfo;
@@ -130,6 +132,8 @@ public class FlinkSourceEnumerator
private final List<FieldEqual> partitionFilters;
+ @Nullable private final LakeSource<LakeSplit> lakeSource;
+
public FlinkSourceEnumerator(
TablePath tablePath,
Configuration flussConf,
@@ -140,6 +144,30 @@ public class FlinkSourceEnumerator
long scanPartitionDiscoveryIntervalMs,
boolean streaming,
List<FieldEqual> partitionFilters) {
+ this(
+ tablePath,
+ flussConf,
+ hasPrimaryKey,
+ isPartitioned,
+ context,
+ startingOffsetsInitializer,
+ scanPartitionDiscoveryIntervalMs,
+ streaming,
+ partitionFilters,
+ null);
+ }
+
+ public FlinkSourceEnumerator(
+ TablePath tablePath,
+ Configuration flussConf,
+ boolean hasPrimaryKey,
+ boolean isPartitioned,
+ SplitEnumeratorContext<SourceSplitBase> context,
+ OffsetsInitializer startingOffsetsInitializer,
+ long scanPartitionDiscoveryIntervalMs,
+ boolean streaming,
+ List<FieldEqual> partitionFilters,
+ LakeSource<LakeSplit> lakeSource) {
this(
tablePath,
flussConf,
@@ -151,7 +179,8 @@ public class FlinkSourceEnumerator
startingOffsetsInitializer,
scanPartitionDiscoveryIntervalMs,
streaming,
- partitionFilters);
+ partitionFilters,
+ lakeSource);
}
public FlinkSourceEnumerator(
@@ -165,7 +194,8 @@ public class FlinkSourceEnumerator
OffsetsInitializer startingOffsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
boolean streaming,
- List<FieldEqual> partitionFilters) {
+ List<FieldEqual> partitionFilters,
+ @Nullable LakeSource<LakeSplit> lakeSource) {
this.tablePath = checkNotNull(tablePath);
this.flussConf = checkNotNull(flussConf);
this.hasPrimaryKey = hasPrimaryKey;
@@ -180,6 +210,7 @@ public class FlinkSourceEnumerator
this.partitionFilters = checkNotNull(partitionFilters);
this.stoppingOffsetsInitializer =
streaming ? new NoStoppingOffsetsInitializer() :
OffsetsInitializer.latest();
+ this.lakeSource = lakeSource;
}
@Override
@@ -485,10 +516,12 @@ public class FlinkSourceEnumerator
new LakeSplitGenerator(
tableInfo,
flussAdmin,
+ lakeSource,
bucketOffsetsRetriever,
stoppingOffsetsInitializer,
tableInfo.getNumBuckets());
- return lakeSplitGenerator.generateLakeSplits();
+ List<SourceSplitBase> lakeSplits =
lakeSplitGenerator.generateHybridLakeSplits();
+ return lakeSplits;
}
private boolean ignoreTableBucket(TableBucket tableBucket) {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
index ba7f6642a..d2db9ac8c 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReader.java
@@ -18,7 +18,7 @@
package com.alibaba.fluss.flink.source.reader;
import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.flink.lakehouse.LakeSplitStateInitializer;
+import com.alibaba.fluss.flink.lake.LakeSplitStateInitializer;
import com.alibaba.fluss.flink.source.emitter.FlinkRecordEmitter;
import com.alibaba.fluss.flink.source.event.PartitionBucketsUnsubscribedEvent;
import com.alibaba.fluss.flink.source.event.PartitionsRemovedEvent;
@@ -28,6 +28,8 @@ import
com.alibaba.fluss.flink.source.split.HybridSnapshotLogSplitState;
import com.alibaba.fluss.flink.source.split.LogSplitState;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
import com.alibaba.fluss.flink.source.split.SourceSplitState;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.types.RowType;
@@ -57,7 +59,8 @@ public class FlinkSourceReader<OUT>
SourceReaderContext context,
@Nullable int[] projectedFields,
FlinkSourceReaderMetrics flinkSourceReaderMetrics,
- FlinkRecordEmitter<OUT> recordEmitter) {
+ FlinkRecordEmitter<OUT> recordEmitter,
+ LakeSource<LakeSplit> lakeSource) {
super(
elementsQueue,
new FlinkSourceFetcherManager(
@@ -68,7 +71,8 @@ public class FlinkSourceReader<OUT>
tablePath,
sourceOutputType,
projectedFields,
- flinkSourceReaderMetrics),
+ flinkSourceReaderMetrics,
+ lakeSource),
(ignore) -> {}),
recordEmitter,
context.getConfiguration(),
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
index 55c57c79c..ac18b5283 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -26,13 +26,15 @@ import
com.alibaba.fluss.client.table.scanner.log.LogScanner;
import com.alibaba.fluss.client.table.scanner.log.ScanRecords;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.PartitionNotExistException;
-import com.alibaba.fluss.flink.lakehouse.LakeSplitReaderGenerator;
+import com.alibaba.fluss.flink.lake.LakeSplitReaderGenerator;
import com.alibaba.fluss.flink.metrics.FlinkMetricRegistry;
import com.alibaba.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
import com.alibaba.fluss.flink.source.split.HybridSnapshotLogSplit;
import com.alibaba.fluss.flink.source.split.LogSplit;
import com.alibaba.fluss.flink.source.split.SnapshotSplit;
import com.alibaba.fluss.flink.source.split.SourceSplitBase;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.types.RowType;
@@ -99,6 +101,8 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
private final Table table;
private final FlinkMetricRegistry flinkMetricRegistry;
+ @Nullable private LakeSource<LakeSplit> lakeSource;
+
// table id, will be null when haven't received any split
private Long tableId;
@@ -116,7 +120,8 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
TablePath tablePath,
RowType sourceOutputType,
@Nullable int[] projectedFields,
- FlinkSourceReaderMetrics flinkSourceReaderMetrics) {
+ FlinkSourceReaderMetrics flinkSourceReaderMetrics,
+ @Nullable LakeSource<LakeSplit> lakeSource) {
this.flinkMetricRegistry =
new
FlinkMetricRegistry(flinkSourceReaderMetrics.getSourceReaderMetricGroup());
this.connection = ConnectionFactory.createConnection(flussConf,
flinkMetricRegistry);
@@ -131,6 +136,7 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
this.logScanner =
table.newScan().project(projectedFields).createLogScanner();
this.stoppingOffsets = new HashMap<>();
this.emptyLogSplits = new HashSet<>();
+ this.lakeSource = lakeSource;
}
@Override
@@ -216,7 +222,8 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
private LakeSplitReaderGenerator getLakeSplitReader() {
if (lakeSplitReaderGenerator == null) {
lakeSplitReaderGenerator =
- new LakeSplitReaderGenerator(table, connection, tablePath,
projectedFields);
+ new LakeSplitReaderGenerator(
+ table, tablePath, projectedFields,
checkNotNull(lakeSource));
}
return lakeSplitReaderGenerator;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java
index dda093086..cdbbd3216 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializer.java
@@ -17,20 +17,24 @@
package com.alibaba.fluss.flink.source.split;
-import com.alibaba.fluss.flink.lakehouse.LakeSplitSerializer;
+import com.alibaba.fluss.flink.lake.LakeSplitSerializer;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
import com.alibaba.fluss.metadata.TableBucket;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
/** A serializer for the {@link SourceSplitBase}. */
public class SourceSplitSerializer implements
SimpleVersionedSerializer<SourceSplitBase> {
- public static final SourceSplitSerializer INSTANCE = new
SourceSplitSerializer();
-
private static final int VERSION_0 = 0;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
@@ -41,7 +45,11 @@ public class SourceSplitSerializer implements
SimpleVersionedSerializer<SourceSp
private static final int CURRENT_VERSION = VERSION_0;
- private LakeSplitSerializer lakeSplitSerializer;
+ @Nullable private final LakeSource<LakeSplit> lakeSource;
+
+ public SourceSplitSerializer(LakeSource<LakeSplit> lakeSource) {
+ this.lakeSource = lakeSource;
+ }
@Override
public int getVersion() {
@@ -75,7 +83,9 @@ public class SourceSplitSerializer implements
SimpleVersionedSerializer<SourceSp
out.writeLong(logSplit.getStoppingOffset().orElse(LogSplit.NO_STOPPING_OFFSET));
}
} else {
- getLakeSplitSerializer().serialize(out, split);
+ LakeSplitSerializer lakeSplitSerializer =
+ new
LakeSplitSerializer(checkNotNull(lakeSource).getSplitSerializer());
+ lakeSplitSerializer.serialize(out, split);
}
final byte[] result = out.getCopyOfBuffer();
@@ -135,14 +145,9 @@ public class SourceSplitSerializer implements
SimpleVersionedSerializer<SourceSp
long stoppingOffset = in.readLong();
return new LogSplit(tableBucket, partitionName, startingOffset,
stoppingOffset);
} else {
- return getLakeSplitSerializer().deserialize(splitKind,
tableBucket, partitionName, in);
- }
- }
-
- private LakeSplitSerializer getLakeSplitSerializer() {
- if (lakeSplitSerializer == null) {
- lakeSplitSerializer = new LakeSplitSerializer();
+ LakeSplitSerializer lakeSplitSerializer =
+ new
LakeSplitSerializer(checkNotNull(lakeSource).getSplitSerializer());
+ return lakeSplitSerializer.deserialize(splitKind, tableBucket,
partitionName, in);
}
- return lakeSplitSerializer;
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
index da65bd646..1c95cf87e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/FlinkConversions.java
@@ -24,6 +24,7 @@ import com.alibaba.fluss.config.MemorySize;
import com.alibaba.fluss.config.Password;
import com.alibaba.fluss.flink.FlinkConnectorOptions;
import com.alibaba.fluss.flink.catalog.FlinkCatalogFactory;
+import com.alibaba.fluss.metadata.DataLakeFormat;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.TableDescriptor;
@@ -51,6 +52,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import static com.alibaba.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
@@ -94,6 +96,20 @@ public class FlinkConversions {
// put fluss table properties into flink options, to make the
properties visible to users
convertFlussTablePropertiesToFlinkOptions(tableInfo.getProperties().toMap(),
newOptions);
+ // put lake related options to table options
+ Optional<DataLakeFormat> optDataLakeFormat =
tableInfo.getTableConfig().getDataLakeFormat();
+ if (optDataLakeFormat.isPresent()) {
+ DataLakeFormat dataLakeFormat = optDataLakeFormat.get();
+ String dataLakePrefix = "table.datalake." + dataLakeFormat + ".";
+
+ for (Map.Entry<String, String> tableProperty :
+ tableInfo.getProperties().toMap().entrySet()) {
+ if (tableProperty.getKey().startsWith(dataLakePrefix)) {
+ newOptions.put(tableProperty.getKey(),
tableProperty.getValue());
+ }
+ }
+ }
+
org.apache.flink.table.api.Schema.Builder schemaBuilder =
org.apache.flink.table.api.Schema.newBuilder();
if (tableInfo.hasPrimaryKey()) {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java
new file mode 100644
index 000000000..6b849e243
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/LakeSourceUtils.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.flink.utils;
+
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.lake.lakestorage.LakeStorage;
+import com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin;
+import com.alibaba.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import com.alibaba.fluss.lake.source.LakeSource;
+import com.alibaba.fluss.lake.source.LakeSplit;
+import com.alibaba.fluss.metadata.TablePath;
+
+import java.util.Map;
+
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+
+/** Utils for create lake source. */
+public class LakeSourceUtils {
+
+ @SuppressWarnings("unchecked")
+ public static LakeSource<LakeSplit> createLakeSource(
+ TablePath tablePath, Map<String, String> properties) {
+ Map<String, String> catalogProperties =
+
DataLakeUtils.extractLakeCatalogProperties(Configuration.fromMap(properties));
+ Configuration lakeConfig = Configuration.fromMap(catalogProperties);
+
+ String dataLake =
+ Configuration.fromMap(properties)
+ .get(ConfigOptions.TABLE_DATALAKE_FORMAT)
+ .toString();
+ LakeStoragePlugin lakeStoragePlugin =
+ LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
+ LakeStorage lakeStorage =
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
+ return (LakeSource<LakeSplit>) lakeStorage.createLakeSource(tablePath);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index e518f7b05..a7594a0b5 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -356,7 +356,8 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
OffsetsInitializer.earliest(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
- Collections.emptyList());
+ Collections.emptyList(),
+ null);
enumerator.start();
assertThat(context.getSplitsAssignmentSequence()).isEmpty();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
index d29749853..4ce072631 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceReaderTest.java
@@ -180,6 +180,7 @@ class FlinkSourceReaderTest extends FlinkTestBase {
context,
null,
new FlinkSourceReaderMetrics(context.metricGroup()),
- recordEmitter);
+ recordEmitter,
+ null);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
index 29ead7607..4984b7d7b 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/reader/FlinkSourceSplitReaderTest.java
@@ -89,7 +89,8 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
DataTypes.FIELD("name",
DataTypes.STRING()),
DataTypes.FIELD("age",
DataTypes.INT())),
null,
- createMockSourceReaderMetrics()))
+ createMockSourceReaderMetrics(),
+ null))
.isInstanceOf(ValidationException.class)
.hasMessage(
"The Flink query schema is not matched to Fluss table
schema. \n"
@@ -106,7 +107,8 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
"id",
DataTypes.BIGINT().copy(false)),
DataTypes.FIELD("name",
DataTypes.STRING())),
new int[] {1, 0},
- createMockSourceReaderMetrics()))
+ createMockSourceReaderMetrics(),
+ null))
.isInstanceOf(ValidationException.class)
.hasMessage(
"The Flink query schema is not matched to Fluss table
schema. \n"
@@ -394,7 +396,7 @@ class FlinkSourceSplitReaderTest extends FlinkTestBase {
private FlinkSourceSplitReader createSplitReader(TablePath tablePath,
RowType rowType) {
return new FlinkSourceSplitReader(
- clientConf, tablePath, rowType, null,
createMockSourceReaderMetrics());
+ clientConf, tablePath, rowType, null,
createMockSourceReaderMetrics(), null);
}
private FlinkSourceReaderMetrics createMockSourceReaderMetrics() {
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java
index da6e7830b..7491eb568 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/split/SourceSplitSerializerTest.java
@@ -31,7 +31,7 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
class SourceSplitSerializerTest {
- private static final SourceSplitSerializer serializer =
SourceSplitSerializer.INSTANCE;
+ private static final SourceSplitSerializer serializer = new
SourceSplitSerializer(null);
private static final TableBucket tableBucket = new TableBucket(1, 2);
private static final TableBucket partitionedTableBucket = new
TableBucket(1, 100L, 2);
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index d4015908b..aa565fbaa 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -319,6 +319,7 @@
<exclude>com.alibaba.fluss.metrics.*</exclude>
<!-- end exclude for metric -->
<exclude>com.alibaba.fluss.flink.lakehouse.*</exclude>
+
<exclude>com.alibaba.fluss.flink.lake.*</exclude>
<exclude>com.alibaba.fluss.kafka.*</exclude>
<!-- exclude for fluss-ci-tools -->
<exclude>com.alibaba.fluss.tools.ci.*</exclude>