This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 79379968d [flink] Union read support log table in streaming mode
(#1575)
79379968d is described below
commit 79379968d0f8b36ac48fb0636a3fd65bb06ea84d
Author: CaoZhen <[email protected]>
AuthorDate: Wed Aug 27 09:55:43 2025 +0800
[flink] Union read support log table in streaming mode (#1575)
---
.../java/org/apache/fluss/client/admin/Admin.java | 2 +
.../fluss/flink/lake/LakeSplitGenerator.java | 49 ++++---
.../fluss/flink/lake/LakeSplitSerializer.java | 10 +-
.../lake/split/LakeSnapshotAndFlussLogSplit.java | 19 +++
.../fluss/flink/lake/split/LakeSnapshotSplit.java | 44 ++++++-
.../flink/lake/state/LakeSnapshotSplitState.java | 1 +
.../org/apache/fluss/flink/source/FlinkSource.java | 3 +-
.../source/enumerator/FlinkSourceEnumerator.java | 141 ++++++++++++++++-----
.../initializer/NoStoppingOffsetsInitializer.java | 4 +-
.../FlussSourceEnumeratorStateSerializer.java | 71 ++++++++++-
.../flink/source/state/SourceEnumeratorState.java | 18 ++-
.../source/enumerator/TieringSourceEnumerator.java | 6 +
.../fluss/flink/lake/LakeSplitSerializerTest.java | 2 +-
.../enumerator/FlinkSourceEnumeratorTest.java | 1 +
.../state/SourceEnumeratorStateSerializerTest.java | 6 +-
fluss-lake/fluss-lake-paimon/pom.xml | 1 +
.../paimon/flink/FlinkUnionReadLogTableITCase.java | 50 +++++++-
.../lake/paimon/flink/FlinkUnionReadTestBase.java | 19 +++
18 files changed, 371 insertions(+), 76 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index 6ce225d25..baaab3507 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -31,6 +31,7 @@ import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidReplicationFactorException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.KvSnapshotNotExistException;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
@@ -383,6 +384,7 @@ public interface Admin extends AutoCloseable {
*
* <ul>
* <li>{@link TableNotExistException} if the table does not exist.
+ * <li>{@link LakeTableSnapshotNotExistException} if no any lake
snapshot exist.
* </ul>
*
* @param tablePath the table path of the table.
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
index 1854fd26f..e420de630 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.lake;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -29,6 +30,7 @@ import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.PartitionInfo;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.utils.ExceptionUtils;
import javax.annotation.Nullable;
@@ -75,10 +77,23 @@ public class LakeSplitGenerator {
this.listPartitionSupplier = listPartitionSupplier;
}
- public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
- // get the file store
- LakeSnapshot lakeSnapshotInfo =
-
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+ /**
+ * Return A list of hybrid lake snapshot {@link LakeSnapshotSplit}, {@link
+ * LakeSnapshotAndFlussLogSplit} and the corresponding Fluss {@link
LogSplit} based on the lake
+ * snapshot. Return null if no lake snapshot exists.
+ */
+ @Nullable
+ public List<SourceSplitBase> generateHybridLakeFlussSplits() throws
Exception {
+ LakeSnapshot lakeSnapshotInfo;
+ try {
+ lakeSnapshotInfo =
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+ } catch (Exception exception) {
+ if (ExceptionUtils.stripExecutionException(exception)
+ instanceof LakeTableSnapshotNotExistException) {
+ return null;
+ }
+ throw exception;
+ }
boolean isLogTable = !tableInfo.hasPrimaryKey();
boolean isPartitioned = tableInfo.isPartitioned();
@@ -90,10 +105,7 @@ public class LakeSplitGenerator {
(LakeSource.PlannerContext)
lakeSnapshotInfo::getSnapshotId)
.plan());
- if (lakeSplits.isEmpty()) {
- return Collections.emptyList();
- }
-
+ Map<TableBucket, Long> tableBucketsOffset =
lakeSnapshotInfo.getTableBucketsOffset();
if (isPartitioned) {
Set<PartitionInfo> partitionInfos = listPartitionSupplier.get();
Map<Long, String> partitionNameById =
@@ -103,16 +115,13 @@ public class LakeSplitGenerator {
PartitionInfo::getPartitionId,
PartitionInfo::getPartitionName));
return generatePartitionTableSplit(
- lakeSplits,
- isLogTable,
- lakeSnapshotInfo.getTableBucketsOffset(),
- partitionNameById);
+ lakeSplits, isLogTable, tableBucketsOffset,
partitionNameById);
} else {
Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
lakeSplits.values().iterator().next();
// non-partitioned table
return generateNoPartitionedTableSplit(
- nonPartitionLakeSplits, isLogTable,
lakeSnapshotInfo.getTableBucketsOffset());
+ nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
}
}
@@ -134,8 +143,7 @@ public class LakeSplitGenerator {
Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
boolean isLogTable,
Map<TableBucket, Long> tableBucketSnapshotLogOffset,
- Map<Long, String> partitionNameById)
- throws Exception {
+ Map<Long, String> partitionNameById) {
List<SourceSplitBase> splits = new ArrayList<>();
Map<String, Long> flussPartitionIdByName =
partitionNameById.entrySet().stream()
@@ -189,7 +197,7 @@ public class LakeSplitGenerator {
// iterate remain fluss splits
for (Map.Entry<String, Long> partitionIdByNameEntry :
flussPartitionIdByName.entrySet()) {
String partitionName = partitionIdByNameEntry.getKey();
- long partitionId = partitionIdByNameEntry.getValue();
+ Long partitionId = partitionIdByNameEntry.getValue();
Map<Integer, Long> bucketEndOffset =
stoppingOffsetInitializer.getBucketOffsets(
partitionName,
@@ -224,7 +232,7 @@ public class LakeSplitGenerator {
TableBucket tableBucket =
new TableBucket(tableInfo.getTableId(), partitionId,
bucket);
Long snapshotLogOffset =
tableBucketSnapshotLogOffset.get(tableBucket);
- long stoppingOffset = bucketEndOffset.get(bucket);
+ Long stoppingOffset = bucketEndOffset.get(bucket);
if (snapshotLogOffset == null) {
// no any data commit to this bucket, scan from fluss log
splits.add(
@@ -248,7 +256,7 @@ public class LakeSplitGenerator {
TableBucket tableBucket =
new TableBucket(tableInfo.getTableId(), partitionId,
bucket);
Long snapshotLogOffset =
tableBucketSnapshotLogOffset.get(tableBucket);
- long stoppingOffset = bucketEndOffset.get(bucket);
+ Long stoppingOffset = bucketEndOffset.get(bucket);
splits.add(
generateSplitForPrimaryKeyTableBucket(
lakeSplits != null ? lakeSplits.get(bucket) :
null,
@@ -267,11 +275,14 @@ public class LakeSplitGenerator {
@Nullable String partitionName,
@Nullable Long partitionId) {
List<SourceSplitBase> splits = new ArrayList<>();
+ // we may have multiple table buckets; so we need to
+ // introduce an index to make split unique
+ int index = 0;
for (LakeSplit lakeSplit :
lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList()))
{
TableBucket tableBucket =
new TableBucket(tableInfo.getTableId(), partitionId,
lakeSplit.bucket());
- splits.add(new LakeSnapshotSplit(tableBucket, partitionName,
lakeSplit));
+ splits.add(new LakeSnapshotSplit(tableBucket, partitionName,
lakeSplit, index++));
}
return splits;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
index 88c4e6db8..d4d200ff9 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
@@ -48,8 +48,9 @@ public class LakeSplitSerializer {
public void serialize(DataOutputSerializer out, SourceSplitBase split)
throws IOException {
if (split instanceof LakeSnapshotSplit) {
- byte[] serializeBytes =
- sourceSplitSerializer.serialize(((LakeSnapshotSplit)
split).getLakeSplit());
+ LakeSnapshotSplit lakeSplit = (LakeSnapshotSplit) split;
+ out.writeInt(lakeSplit.getSplitIndex());
+ byte[] serializeBytes =
sourceSplitSerializer.serialize(lakeSplit.getLakeSplit());
out.writeInt(serializeBytes.length);
out.write(serializeBytes);
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
@@ -89,12 +90,13 @@ public class LakeSplitSerializer {
DataInputDeserializer input)
throws IOException {
if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
+ int splitIndex = input.readInt();
byte[] serializeBytes = new byte[input.readInt()];
input.read(serializeBytes);
- LakeSplit fileStoreSourceSplit =
+ LakeSplit lakeSplit =
sourceSplitSerializer.deserialize(
sourceSplitSerializer.getVersion(),
serializeBytes);
- return new LakeSnapshotSplit(tableBucket, partition,
fileStoreSourceSplit);
+ return new LakeSnapshotSplit(tableBucket, partition, lakeSplit,
splitIndex);
} else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
List<LakeSplit> lakeSplits = null;
if (input.readBoolean()) {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
index c95bb51d4..0c2d60dbb 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
@@ -107,4 +107,23 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
public List<LakeSplit> getLakeSplits() {
return lakeSnapshotSplits;
}
+
+ @Override
+ public String toString() {
+ return "LakeSnapshotAndFlussLogSplit{"
+ + "lakeSnapshotSplits="
+ + lakeSnapshotSplits
+ + ", recordOffset="
+ + recordOffset
+ + ", startingOffset="
+ + startingOffset
+ + ", stoppingOffset="
+ + stoppingOffset
+ + ", tableBucket="
+ + tableBucket
+ + ", partitionName='"
+ + partitionName
+ + '\''
+ + '}';
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
index 1cc883d3f..b85d990fe 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
@@ -32,18 +32,25 @@ public class LakeSnapshotSplit extends SourceSplitBase {
private final long recordsToSplit;
+ private final int splitIndex;
+
public LakeSnapshotSplit(
- TableBucket tableBucket, @Nullable String partitionName, LakeSplit
lakeSplit) {
- this(tableBucket, partitionName, lakeSplit, 0);
+ TableBucket tableBucket,
+ @Nullable String partitionName,
+ LakeSplit lakeSplit,
+ int splitIndex) {
+ this(tableBucket, partitionName, lakeSplit, splitIndex, 0);
}
public LakeSnapshotSplit(
TableBucket tableBucket,
@Nullable String partitionName,
LakeSplit lakeSplit,
+ int splitIndex,
long recordsToSplit) {
super(tableBucket, partitionName);
this.lakeSplit = lakeSplit;
+ this.splitIndex = splitIndex;
this.recordsToSplit = recordsToSplit;
}
@@ -55,14 +62,20 @@ public class LakeSnapshotSplit extends SourceSplitBase {
return recordsToSplit;
}
+ public int getSplitIndex() {
+ return splitIndex;
+ }
+
@Override
public String splitId() {
return toSplitId(
- "lake-snapshot-",
- new TableBucket(
- tableBucket.getTableId(),
- tableBucket.getPartitionId(),
- lakeSplit.bucket()));
+ "lake-snapshot-",
+ new TableBucket(
+ tableBucket.getTableId(),
+ tableBucket.getPartitionId(),
+ lakeSplit.bucket()))
+ + "-"
+ + splitIndex;
}
@Override
@@ -74,4 +87,21 @@ public class LakeSnapshotSplit extends SourceSplitBase {
public byte splitKind() {
return LAKE_SNAPSHOT_SPLIT_KIND;
}
+
+ @Override
+ public String toString() {
+ return "LakeSnapshotSplit{"
+ + "lakeSplit="
+ + lakeSplit
+ + ", recordsToSplit="
+ + recordsToSplit
+ + ", splitIndex="
+ + splitIndex
+ + ", tableBucket="
+ + tableBucket
+ + ", partitionName='"
+ + partitionName
+ + '\''
+ + '}';
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
index e81358eae..399601b99 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
@@ -43,6 +43,7 @@ public class LakeSnapshotSplitState extends SourceSplitState {
split.getTableBucket(),
split.getPartitionName(),
split.getLakeSplit(),
+ split.getSplitIndex(),
recordsToSplit);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index 375d0540f..9687b5efc 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -161,6 +161,7 @@ public class FlinkSource<OUT>
splitEnumeratorContext,
sourceEnumeratorState.getAssignedBuckets(),
sourceEnumeratorState.getAssignedPartitions(),
+ sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
offsetsInitializer,
scanPartitionDiscoveryIntervalMs,
streaming,
@@ -175,7 +176,7 @@ public class FlinkSource<OUT>
@Override
public SimpleVersionedSerializer<SourceEnumeratorState>
getEnumeratorCheckpointSerializer() {
- return FlussSourceEnumeratorStateSerializer.INSTANCE;
+ return new FlussSourceEnumeratorStateSerializer(lakeSource);
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index e63d286d0..867ff442f 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -111,6 +111,8 @@ public class FlinkSourceEnumerator
/** buckets that have been assigned to readers. */
private final Set<TableBucket> assignedTableBuckets;
+ @Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;
+
private final long scanPartitionDiscoveryIntervalMs;
private final boolean streaming;
@@ -168,7 +170,7 @@ public class FlinkSourceEnumerator
long scanPartitionDiscoveryIntervalMs,
boolean streaming,
List<FieldEqual> partitionFilters,
- LakeSource<LakeSplit> lakeSource) {
+ @Nullable LakeSource<LakeSplit> lakeSource) {
this(
tablePath,
flussConf,
@@ -177,6 +179,7 @@ public class FlinkSourceEnumerator
context,
Collections.emptySet(),
Collections.emptyMap(),
+ null,
startingOffsetsInitializer,
scanPartitionDiscoveryIntervalMs,
streaming,
@@ -192,6 +195,7 @@ public class FlinkSourceEnumerator
SplitEnumeratorContext<SourceSplitBase> context,
Set<TableBucket> assignedTableBuckets,
Map<Long, String> assignedPartitions,
+ List<SourceSplitBase> pendingHybridLakeFlussSplits,
OffsetsInitializer startingOffsetsInitializer,
long scanPartitionDiscoveryIntervalMs,
boolean streaming,
@@ -206,6 +210,10 @@ public class FlinkSourceEnumerator
this.assignedTableBuckets = new HashSet<>(assignedTableBuckets);
this.startingOffsetsInitializer = startingOffsetsInitializer;
this.assignedPartitions = new HashMap<>(assignedPartitions);
+ this.pendingHybridLakeFlussSplits =
+ pendingHybridLakeFlussSplits == null
+ ? null
+ : new LinkedList<>(pendingHybridLakeFlussSplits);
this.scanPartitionDiscoveryIntervalMs =
scanPartitionDiscoveryIntervalMs;
this.streaming = streaming;
this.partitionFilters = checkNotNull(partitionFilters);
@@ -230,22 +238,29 @@ public class FlinkSourceEnumerator
}
if (isPartitioned) {
- if (streaming && scanPartitionDiscoveryIntervalMs > 0) {
- // should do partition discovery
- LOG.info(
- "Starting the FlussSourceEnumerator for table {} "
- + "with new partition discovery interval of {}
ms.",
- tablePath,
- scanPartitionDiscoveryIntervalMs);
- // discover new partitions and handle new partitions
- context.callAsync(
- this::listPartitions,
- this::checkPartitionChanges,
- 0,
- scanPartitionDiscoveryIntervalMs);
- } else {
- if (!streaming) {
- startInBatchMode();
+ if (streaming) {
+ if (lakeSource != null) {
+ // we'll need to consider lake splits
+ List<SourceSplitBase> hybridLakeFlussSplits =
generateHybridLakeFlussSplits();
+ if (hybridLakeFlussSplits != null) {
+ // handle hybrid lake fluss splits firstly
+ handleSplitsAdd(hybridLakeFlussSplits, null);
+ }
+ }
+
+ if (scanPartitionDiscoveryIntervalMs > 0) {
+ // should do partition discovery
+ LOG.info(
+ "Starting the FlussSourceEnumerator for table {} "
+ + "with new partition discovery interval
of {} ms.",
+ tablePath,
+ scanPartitionDiscoveryIntervalMs);
+ // discover new partitions and handle new partitions
+ context.callAsync(
+ this::listPartitions,
+ this::checkPartitionChanges,
+ 0,
+ scanPartitionDiscoveryIntervalMs);
} else {
// just call once
LOG.info(
@@ -253,21 +268,30 @@ public class FlinkSourceEnumerator
tablePath);
context.callAsync(this::listPartitions,
this::checkPartitionChanges);
}
+ } else {
+ startInBatchMode();
}
-
} else {
- if (!streaming) {
- startInBatchMode();
+ if (streaming) {
+ startInStreamModeForNonPartitionedTable();
} else {
- // init bucket splits and assign
- context.callAsync(this::initNonPartitionedSplits,
this::handleSplitsAdd);
+ startInBatchMode();
}
}
}
private void startInBatchMode() {
if (lakeEnabled) {
- context.callAsync(this::getLakeSplit, this::handleSplitsAdd);
+ context.callAsync(
+ () -> {
+ List<SourceSplitBase> splits =
generateHybridLakeFlussSplits();
+ if (splits == null) {
+ throw new UnsupportedOperationException(
+ "Currently, Batch mode can only be
supported if one lake snapshot exists for the table.");
+ }
+ return splits;
+ },
+ this::handleSplitsAdd);
} else {
throw new UnsupportedOperationException(
String.format(
@@ -276,6 +300,26 @@ public class FlinkSourceEnumerator
}
}
+ private void startInStreamModeForNonPartitionedTable() {
+ if (lakeSource != null) {
+ context.callAsync(
+ () -> {
+ // firstly, try to generate hybrid lake splits,
+ List<SourceSplitBase> splits =
generateHybridLakeFlussSplits();
+ // splits is null,
+ // we'll fall back to normal fluss splits generation
logic
+ if (splits == null) {
+ splits = this.initNonPartitionedSplits();
+ }
+ return splits;
+ },
+ this::handleSplitsAdd);
+ } else {
+ // init bucket splits and assign
+ context.callAsync(this::initNonPartitionedSplits,
this::handleSplitsAdd);
+ }
+ }
+
private List<SourceSplitBase> initNonPartitionedSplits() {
if (hasPrimaryKey && startingOffsetsInitializer instanceof
SnapshotOffsetsInitializer) {
// get the table snapshot info
@@ -294,6 +338,9 @@ public class FlinkSourceEnumerator
}
private Set<PartitionInfo> listPartitions() {
+ if (closed) {
+ return Collections.emptySet();
+ }
try {
List<PartitionInfo> partitionInfos =
flussAdmin.listPartitionInfos(tablePath).get();
partitionInfos = applyPartitionFilter(partitionInfos);
@@ -512,17 +559,30 @@ public class FlinkSourceEnumerator
return splits;
}
- private List<SourceSplitBase> getLakeSplit() throws Exception {
- LakeSplitGenerator lakeSplitGenerator =
- new LakeSplitGenerator(
- tableInfo,
- flussAdmin,
- lakeSource,
- bucketOffsetsRetriever,
- stoppingOffsetsInitializer,
- tableInfo.getNumBuckets(),
- this::listPartitions);
- return lakeSplitGenerator.generateHybridLakeSplits();
+ @Nullable
+ private List<SourceSplitBase> generateHybridLakeFlussSplits() {
+ // still have pending lake fluss splits,
+ // should be restored from checkpoint, shouldn't
+ // list splits again
+ if (pendingHybridLakeFlussSplits != null) {
+ return pendingHybridLakeFlussSplits;
+ }
+ try {
+ LakeSplitGenerator lakeSplitGenerator =
+ new LakeSplitGenerator(
+ tableInfo,
+ flussAdmin,
+ lakeSource,
+ bucketOffsetsRetriever,
+ stoppingOffsetsInitializer,
+ tableInfo.getNumBuckets(),
+ this::listPartitions);
+ pendingHybridLakeFlussSplits =
lakeSplitGenerator.generateHybridLakeFlussSplits();
+ return pendingHybridLakeFlussSplits;
+ } catch (Exception e) {
+ LOG.error("Failed to get hybrid lake fluss splits, won't take
splits in lake.", e);
+ }
+ return null;
}
private boolean ignoreTableBucket(TableBucket tableBucket) {
@@ -619,6 +679,16 @@ public class FlinkSourceEnumerator
TableBucket tableBucket = split.getTableBucket();
assignedTableBuckets.add(tableBucket);
+ if (pendingHybridLakeFlussSplits != null) {
+ // removed from the
pendingHybridLakeFlussSplits
+ // since this split already be assigned
+ pendingHybridLakeFlussSplits.removeIf(
+ hybridLakeFlussSplit ->
+ hybridLakeFlussSplit
+ .splitId()
+
.equals(split.splitId()));
+ }
+
if (isPartitioned) {
long partitionId =
checkNotNull(
@@ -755,7 +825,8 @@ public class FlinkSourceEnumerator
@Override
public SourceEnumeratorState snapshotState(long checkpointId) {
final SourceEnumeratorState enumeratorState =
- new SourceEnumeratorState(assignedTableBuckets,
assignedPartitions);
+ new SourceEnumeratorState(
+ assignedTableBuckets, assignedPartitions,
pendingHybridLakeFlussSplits);
LOG.debug("Source Checkpoint is {}", enumeratorState);
return enumeratorState;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
index b20fc1757..24a1a11bd 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
@@ -20,8 +20,8 @@ package org.apache.fluss.flink.source.enumerator.initializer;
import javax.annotation.Nullable;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* An implementation of {@link OffsetsInitializer} which does not initialize
anything.
@@ -37,6 +37,6 @@ public class NoStoppingOffsetsInitializer implements
OffsetsInitializer {
@Nullable String partitionName,
Collection<Integer> buckets,
OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever) {
- return Collections.emptyMap();
+ return buckets.stream().collect(Collectors.toMap(x -> x, x ->
Long.MAX_VALUE));
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
index 559b7e35e..512441c9e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
@@ -17,15 +17,23 @@
package org.apache.fluss.flink.source.state;
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+import org.apache.fluss.flink.source.split.SourceSplitSerializer;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.LakeSplit;
import org.apache.fluss.metadata.TableBucket;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
+import javax.annotation.Nullable;
+
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,8 +41,7 @@ import java.util.Set;
public class FlussSourceEnumeratorStateSerializer
implements SimpleVersionedSerializer<SourceEnumeratorState> {
- public static final FlussSourceEnumeratorStateSerializer INSTANCE =
- new FlussSourceEnumeratorStateSerializer();
+ @Nullable private final LakeSource<LakeSplit> lakeSource;
private static final int VERSION_0 = 0;
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
@@ -42,6 +49,10 @@ public class FlussSourceEnumeratorStateSerializer
private static final int CURRENT_VERSION = VERSION_0;
+ public FlussSourceEnumeratorStateSerializer(LakeSource<LakeSplit>
lakeSource) {
+ this.lakeSource = lakeSource;
+ }
+
@Override
public int getVersion() {
return CURRENT_VERSION;
@@ -73,6 +84,10 @@ public class FlussSourceEnumeratorStateSerializer
out.writeUTF(entry.getValue());
}
+ if (lakeSource != null) {
+ serializeRemainingHybridLakeFlussSplits(out, state);
+ }
+
final byte[] result = out.getCopyOfBuffer();
out.clear();
return result;
@@ -107,6 +122,56 @@ public class FlussSourceEnumeratorStateSerializer
String partition = in.readUTF();
assignedPartitions.put(partitionId, partition);
}
- return new SourceEnumeratorState(assignedBuckets, assignedPartitions);
+
+ List<SourceSplitBase> remainingHybridLakeFlussSplits = null;
+ if (lakeSource != null) {
+ // todo: add a ut for serialize remaining hybrid lake fluss splits
+ remainingHybridLakeFlussSplits =
deserializeRemainingHybridLakeFlussSplits(in);
+ }
+
+ return new SourceEnumeratorState(
+ assignedBuckets, assignedPartitions,
remainingHybridLakeFlussSplits);
+ }
+
+ private void serializeRemainingHybridLakeFlussSplits(
+ final DataOutputSerializer out, SourceEnumeratorState state)
throws IOException {
+ List<SourceSplitBase> remainingHybridLakeFlussSplits =
+ state.getRemainingHybridLakeFlussSplits();
+ if (remainingHybridLakeFlussSplits != null) {
+ // write that hybrid lake fluss splits is not null
+ out.writeBoolean(true);
+ out.writeInt(remainingHybridLakeFlussSplits.size());
+ SourceSplitSerializer sourceSplitSerializer = new
SourceSplitSerializer(lakeSource);
+ for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
+ byte[] serializeBytes = sourceSplitSerializer.serialize(split);
+ out.writeInt(serializeBytes.length);
+ out.write(serializeBytes);
+ }
+
+ } else {
+ // write that hybrid lake fluss splits is null
+ out.writeBoolean(false);
+ }
+ }
+
+ @Nullable
+ private List<SourceSplitBase> deserializeRemainingHybridLakeFlussSplits(
+ final DataInputDeserializer in) throws IOException {
+ if (in.readBoolean()) {
+ int numSplits = in.readInt();
+ List<SourceSplitBase> splits = new ArrayList<>(numSplits);
+ SourceSplitSerializer sourceSplitSerializer = new
SourceSplitSerializer(lakeSource);
+ for (int i = 0; i < numSplits; i++) {
+ int splitSizeInBytes = in.readInt();
+ byte[] splitBytes = new byte[splitSizeInBytes];
+ in.readFully(splitBytes);
+ splits.add(
+ sourceSplitSerializer.deserialize(
+ sourceSplitSerializer.getVersion(),
splitBytes));
+ }
+ return splits;
+ } else {
+ return null;
+ }
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
index ab5598963..d804d069b 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
@@ -17,8 +17,12 @@
package org.apache.fluss.flink.source.state;
+import org.apache.fluss.flink.source.split.SourceSplitBase;
import org.apache.fluss.metadata.TableBucket;
+import javax.annotation.Nullable;
+
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -33,10 +37,17 @@ public class SourceEnumeratorState {
// mapping from partition id to partition name
private final Map<Long, String> assignedPartitions;
+ // the unassigned lake splits of a lake snapshot and the fluss splits base
on the
+ // lake snapshot
+ @Nullable private final List<SourceSplitBase>
remainingHybridLakeFlussSplits;
+
public SourceEnumeratorState(
- Set<TableBucket> assignedBuckets, Map<Long, String>
assignedPartitions) {
+ Set<TableBucket> assignedBuckets,
+ Map<Long, String> assignedPartitions,
+ @Nullable List<SourceSplitBase> remainingHybridLakeFlussSplits) {
this.assignedBuckets = assignedBuckets;
this.assignedPartitions = assignedPartitions;
+ this.remainingHybridLakeFlussSplits = remainingHybridLakeFlussSplits;
}
public Set<TableBucket> getAssignedBuckets() {
@@ -47,6 +58,11 @@ public class SourceEnumeratorState {
return assignedPartitions;
}
+ @Nullable
+ public List<SourceSplitBase> getRemainingHybridLakeFlussSplits() {
+ return remainingHybridLakeFlussSplits;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 42ea11922..7d6a3b31a 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -104,6 +104,8 @@ public class TieringSourceEnumerator
private TieringSplitGenerator splitGenerator;
private int flussCoordinatorEpoch;
+ private volatile boolean closed = false;
+
public TieringSourceEnumerator(
Configuration flussConf,
SplitEnumeratorContext<TieringSplit> context,
@@ -254,6 +256,9 @@ public class TieringSourceEnumerator
}
private @Nullable Tuple3<Long, Long, TablePath>
requestTieringTableSplitsViaHeartBeat() {
+ if (closed) {
+ return null;
+ }
Map<Long, Long> currentFinishedTableEpochs = new
HashMap<>(this.finishedTableEpochs);
Map<Long, Long> currentFailedTableEpochs = new
HashMap<>(this.failedTableEpochs);
LakeTieringHeartbeatRequest tieringHeartbeatRequest =
@@ -342,6 +347,7 @@ public class TieringSourceEnumerator
@Override
public void close() throws IOException {
+ closed = true;
if (rpcClient != null) {
failedTableEpochs.putAll(tieringTableEpochs);
tieringTableEpochs.clear();
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index ff69bce92..15bb5d7d0 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -62,7 +62,7 @@ class LakeSplitSerializerTest {
void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {
// Prepare test data
LakeSnapshotSplit originalSplit =
- new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT);
+ new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT,
1);
DataOutputSerializer output = new
DataOutputSerializer(STOPPING_OFFSET);
serializer.serialize(output, originalSplit);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 4b6db5026..7218f3891 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -353,6 +353,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
context,
assignedBuckets,
Collections.emptyMap(),
+ Collections.emptyList(),
OffsetsInitializer.earliest(),
DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
streaming,
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
index f5f39b331..6a413f80d 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -37,7 +38,7 @@ class SourceEnumeratorStateSerializerTest {
@Test
void testPendingSplitsCheckpointSerde() throws Exception {
FlussSourceEnumeratorStateSerializer serializer =
- FlussSourceEnumeratorStateSerializer.INSTANCE;
+ new FlussSourceEnumeratorStateSerializer(null);
Set<TableBucket> assignedBuckets =
new HashSet<>(Arrays.asList(new TableBucket(1, 0), new
TableBucket(1, 4L, 1)));
@@ -46,7 +47,8 @@ class SourceEnumeratorStateSerializerTest {
assignedPartitions.put(2L, "partition2");
SourceEnumeratorState sourceEnumeratorState =
- new SourceEnumeratorState(assignedBuckets, assignedPartitions);
+ new SourceEnumeratorState(
+ assignedBuckets, assignedPartitions,
Collections.emptyList());
// serialize assigned buckets
byte[] serialized = serializer.serialize(sourceEnumeratorState);
diff --git a/fluss-lake/fluss-lake-paimon/pom.xml
b/fluss-lake/fluss-lake-paimon/pom.xml
index 57b154460..c1ee02f42 100644
--- a/fluss-lake/fluss-lake-paimon/pom.xml
+++ b/fluss-lake/fluss-lake-paimon/pom.xml
@@ -164,6 +164,7 @@
<artifactId>fluss-flink-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
+ <type>test-jar</type>
</dependency>
<dependency>
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 426969941..0b530a722 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.row.TimestampNtz;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
@@ -36,10 +37,12 @@ import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -122,6 +125,50 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "stream_logTable_" + (isPartitioned ? "partitioned"
: "non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ List<Row> writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned,
writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // now, start to read the log table, which will read paimon
+ // may read fluss or not, depends on the log offset of paimon snapshot
+ CloseableIterator<Row> actual =
+ streamTEnv.executeSql("select * from " + tableName).collect();
+ assertResultsIgnoreOrder(
+ actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+ // can database sync job
+ jobClient.cancel().get();
+
+ // write some log data again
+ writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+ // query the log table again and check the data
+ // it should read both paimon snapshot and fluss log
+ actual =
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+ .collect();
+ if (isPartitioned) {
+ // we write to a new partition to verify partition discovery
+ writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+ }
+ assertResultsIgnoreOrder(
+ actual,
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+ }
+
private long prepareLogTable(
TablePath tablePath, int bucketNum, boolean isPartitioned,
List<Row> flinkRows)
throws Exception {
@@ -129,7 +176,8 @@ class FlinkUnionReadLogTableITCase extends
FlinkUnionReadTestBase {
long t1Id = createFullTypeLogTable(tablePath, bucketNum,
isPartitioned);
if (isPartitioned) {
Map<Long, String> partitionNameById =
waitUntilPartitions(tablePath);
- for (String partition : partitionNameById.values()) {
+ for (String partition :
+
partitionNameById.values().stream().sorted().collect(Collectors.toList())) {
for (int i = 0; i < 3; i++) {
flinkRows.addAll(writeFullTypeRows(tablePath, 10,
partition));
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 293e48107..1ad885da9 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -32,6 +32,7 @@ public class FlinkUnionReadTestBase extends
FlinkPaimonTieringTestBase {
protected static final int DEFAULT_BUCKET_NUM = 1;
StreamTableEnvironment batchTEnv;
+ StreamTableEnvironment streamTEnv;
@BeforeAll
protected static void beforeAll() {
@@ -41,6 +42,24 @@ public class FlinkUnionReadTestBase extends
FlinkPaimonTieringTestBase {
@BeforeEach
public void beforeEach() {
super.beforeEach();
+ buildBatchTEnv();
+ buildStreamTEnv();
+ }
+
+ private void buildStreamTEnv() {
+ String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+ // create table environment
+ streamTEnv = StreamTableEnvironment.create(execEnv,
EnvironmentSettings.inStreamingMode());
+ // crate catalog using sql
+ streamTEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' =
'%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(),
bootstrapServers));
+ streamTEnv.executeSql("use catalog " + CATALOG_NAME);
+ streamTEnv.executeSql("use " + DEFAULT_DB);
+ }
+
+ public void buildBatchTEnv() {
String bootstrapServers = String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
// create table environment
batchTEnv = StreamTableEnvironment.create(execEnv,
EnvironmentSettings.inBatchMode());