This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5c8f78180 [flink] Union read support primary key table in streaming
mode (#1613)
5c8f78180 is described below
commit 5c8f78180ed16f08dc020b2e95847a206c7c533b
Author: CaoZhen <[email protected]>
AuthorDate: Fri Sep 5 16:00:46 2025 +0800
[flink] Union read support primary key table in streaming mode (#1613)
---
.../fluss/flink/lake/LakeRecordRecordEmitter.java | 22 +-
.../fluss/flink/lake/LakeSplitGenerator.java | 4 +-
.../fluss/flink/lake/LakeSplitReaderGenerator.java | 32 ++-
.../fluss/flink/lake/LakeSplitSerializer.java | 5 +-
.../reader/IndexedLakeSplitRecordIterator.java | 70 +++++++
.../reader/SeekableLakeSnapshotSplitScanner.java | 107 ++++++++++
.../lake/split/LakeSnapshotAndFlussLogSplit.java | 49 +++--
.../state/LakeSnapshotAndFlussLogSplitState.java | 16 +-
.../initializer/NoStoppingOffsetsInitializer.java | 4 +-
.../flink/source/reader/BoundedSplitReader.java | 17 +-
.../source/reader/FlinkSourceSplitReader.java | 18 +-
.../flink/source/reader/MutableRecordAndPos.java | 7 +-
.../fluss/flink/source/reader/RecordAndPos.java | 31 ++-
.../tiering/committer/TieringCommitOperator.java | 13 ++
.../flink/tiering/event/TieringRestoreEvent.java | 26 +++
.../source/enumerator/TieringSourceEnumerator.java | 10 +
.../apache/fluss/flink/utils/LakeSourceUtils.java | 11 +-
.../fluss/flink/lake/LakeSplitSerializerTest.java | 2 +-
.../flink/source/reader/RecordAndPosTest.java | 6 +-
.../source/testutils/FlinkRowAssertionsUtils.java | 15 +-
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 231 ++++++++++++++++++++-
fluss-test-coverage/pom.xml | 1 +
22 files changed, 648 insertions(+), 49 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
index 147043fdc..031444501 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -44,8 +44,26 @@ public class LakeRecordRecordEmitter<OUT> {
((LakeSnapshotSplitState)
splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
} else if (splitState instanceof LakeSnapshotAndFlussLogSplitState) {
- ((LakeSnapshotAndFlussLogSplitState) splitState)
- .setRecordsToSkip(recordAndPos.readRecordsCount());
+ LakeSnapshotAndFlussLogSplitState
lakeSnapshotAndFlussLogSplitState =
+ (LakeSnapshotAndFlussLogSplitState) splitState;
+
+ // set current split index
+ lakeSnapshotAndFlussLogSplitState.setCurrentLakeSplitIndex(
+ recordAndPos.getCurrentSplitIndex());
+
+ // set records to skip to state
+ if (recordAndPos.readRecordsCount() > 0) {
+
lakeSnapshotAndFlussLogSplitState.setRecordsToSkip(recordAndPos.readRecordsCount());
+ }
+
+ ScanRecord scanRecord = recordAndPos.record();
+ // todo: may need a state to mark snapshot phase is finished
+ // just like what we do for HybridSnapshotLogSplitState
+ if (scanRecord.logOffset() >= 0) {
+ // record is with a valid offset, means it's in incremental
phase,
+ // update the log offset
+
lakeSnapshotAndFlussLogSplitState.setNextLogOffset(scanRecord.logOffset() + 1);
+ }
sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
} else {
throw new UnsupportedOperationException(
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
index e420de630..cd0926294 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
@@ -46,6 +46,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static org.apache.fluss.flink.source.split.LogSplit.NO_STOPPING_OFFSET;
import static
org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
/** A generator for lake splits. */
@@ -240,7 +241,8 @@ public class LakeSplitGenerator {
tableBucket, partitionName,
EARLIEST_OFFSET, stoppingOffset));
} else {
// need to read remain fluss log
- if (snapshotLogOffset < stoppingOffset) {
+ if (stoppingOffset == NO_STOPPING_OFFSET
+ || snapshotLogOffset < stoppingOffset) {
splits.add(
new LogSplit(
tableBucket,
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
index 36958736a..70734a490 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -18,8 +18,10 @@
package org.apache.fluss.flink.lake;
import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner;
import org.apache.fluss.flink.lake.reader.LakeSnapshotScanner;
+import org.apache.fluss.flink.lake.reader.SeekableLakeSnapshotSplitScanner;
import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
import org.apache.fluss.flink.source.reader.BoundedSplitReader;
@@ -29,6 +31,7 @@ import org.apache.fluss.lake.source.LakeSplit;
import javax.annotation.Nullable;
+import java.util.Collections;
import java.util.Queue;
/** A generator to generate reader for lake split. */
@@ -67,17 +70,30 @@ public class LakeSplitReaderGenerator {
return new BoundedSplitReader(
lakeSnapshotScanner,
lakeSnapshotSplit.getRecordsToSplit());
} else if (split instanceof LakeSnapshotAndFlussLogSplit) {
- LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
- (LakeSnapshotAndFlussLogSplit) split;
- LakeSnapshotAndLogSplitScanner lakeSnapshotAndLogSplitScanner =
- new LakeSnapshotAndLogSplitScanner(
- table, lakeSource, lakeSnapshotAndFlussLogSplit,
projectedFields);
- return new BoundedSplitReader(
- lakeSnapshotAndLogSplitScanner,
- lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
+ LakeSnapshotAndFlussLogSplit lakeSplit =
(LakeSnapshotAndFlussLogSplit) split;
+ return new BoundedSplitReader(getBatchScanner(lakeSplit),
lakeSplit.getRecordsToSkip());
} else {
throw new UnsupportedOperationException(
String.format("The split type of %s is not supported.",
split.getClass()));
}
}
+
+ private BatchScanner getBatchScanner(LakeSnapshotAndFlussLogSplit
lakeSplit) {
+ BatchScanner lakeBatchScanner;
+ if (lakeSplit.isStreaming()) {
+ lakeBatchScanner =
+ new SeekableLakeSnapshotSplitScanner(
+ lakeSource,
+ lakeSplit.getLakeSplits() == null
+ ? Collections.emptyList()
+ : lakeSplit.getLakeSplits(),
+ lakeSplit.getCurrentLakeSplitIndex());
+
+ } else {
+ lakeBatchScanner =
+ new LakeSnapshotAndLogSplitScanner(
+ table, lakeSource, lakeSplit, projectedFields);
+ }
+ return lakeBatchScanner;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
index d4d200ff9..af2eeb2ad 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
@@ -77,6 +77,7 @@ public class LakeSplitSerializer {
.getStoppingOffset()
.orElse(LogSplit.NO_STOPPING_OFFSET));
out.writeLong(lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
+
out.writeInt(lakeSnapshotAndFlussLogSplit.getCurrentLakeSplitIndex());
} else {
throw new UnsupportedOperationException(
"Unsupported split type: " + split.getClass().getName());
@@ -113,13 +114,15 @@ public class LakeSplitSerializer {
long startingOffset = input.readLong();
long stoppingOffset = input.readLong();
long recordsToSkip = input.readLong();
+ int splitIndex = input.readInt();
return new LakeSnapshotAndFlussLogSplit(
tableBucket,
partition,
lakeSplits,
startingOffset,
stoppingOffset,
- recordsToSkip);
+ recordsToSkip,
+ splitIndex);
} else {
throw new UnsupportedOperationException("Unsupported split kind: "
+ splitKind);
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java
new file mode 100644
index 000000000..4af2b5add
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.lake.reader;
+
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+/**
+ * An iterator wrapper that converts LogRecord objects to InternalRow objects
while tracking the
+ * current LakeSplit index being processed.
+ *
+ * <p>This class serves as an adapter between the underlying LogRecord
iterator and the InternalRow
+ * interface expected by consumers. It maintains reference to the specific
LakeSplit index that is
+ * currently being iterated.
+ *
+ * <p>Primary responsibilities:
+ *
+ * <ul>
+ * <li>Wraps a LogRecord iterator and exposes InternalRow objects
+ * <li>Preserves the index of the LakeSplit being processed
+ * <li>Provides clean resource management through Closeable interface
+ * <li>Maintains iterator semantics for sequential data access
+ * </ul>
+ */
+public class IndexedLakeSplitRecordIterator implements
CloseableIterator<InternalRow> {
+ private final CloseableIterator<LogRecord> logRecordIterators;
+ private final int currentLakeSplitIndex;
+
+ public IndexedLakeSplitRecordIterator(
+ CloseableIterator<LogRecord> logRecordIterators, int
currentLakeSplitIndex) {
+ this.logRecordIterators = logRecordIterators;
+ this.currentLakeSplitIndex = currentLakeSplitIndex;
+ }
+
+ public int getCurrentLakeSplitIndex() {
+ return currentLakeSplitIndex;
+ }
+
+ @Override
+ public void close() {
+ logRecordIterators.close();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return logRecordIterators.hasNext();
+ }
+
+ @Override
+ public InternalRow next() {
+ return logRecordIterators.next().getRow();
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SeekableLakeSnapshotSplitScanner.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SeekableLakeSnapshotSplitScanner.java
new file mode 100644
index 000000000..b0df3c174
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SeekableLakeSnapshotSplitScanner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.lake.reader;
+
+import org.apache.fluss.client.table.scanner.batch.BatchScanner;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * A scanner that supports seeking to a specific LakeSplit and reading from
that point.
+ *
+ * <p>This scanner enables direct positioning to any LakeSplit in the list
using its index, and then
+ * sequentially reads data starting from that specific split. It provides
fine-grained control over
+ * where to begin the scanning process.
+ *
+ * <p>Key capabilities:
+ *
+ * <ul>
+ * <li>Direct seeking to any LakeSplit by index
+ * <li>Sequential reading starting from the sought split
+ * <li>Precise positioning within the split collection
+ * <li>Resumable scanning from arbitrary positions
+ * </ul>
+ */
+public class SeekableLakeSnapshotSplitScanner implements BatchScanner {
+
+ private final List<LakeSplit> lakeSplits;
+ private int nextLakeSplitIndex;
+ private final LakeSource<LakeSplit> lakeSource;
+ private CloseableIterator<InternalRow> currentLakeRecordIterator;
+
+ public SeekableLakeSnapshotSplitScanner(
+ LakeSource<LakeSplit> lakeSource,
+ List<LakeSplit> lakeSplits,
+ int currentLakeSplitIndex) {
+ // add lake splits
+ this.lakeSplits = lakeSplits;
+ this.nextLakeSplitIndex = currentLakeSplitIndex;
+ this.lakeSource = lakeSource;
+ }
+
+ @Nullable
+ @Override
+ public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws
IOException {
+ if (currentLakeRecordIterator == null) {
+ updateCurrentIterator();
+ }
+
+ // has no next record in currentIterator, update currentIterator
+ if (currentLakeRecordIterator != null &&
!currentLakeRecordIterator.hasNext()) {
+ updateCurrentIterator();
+ }
+
+ return currentLakeRecordIterator != null &&
currentLakeRecordIterator.hasNext()
+ ? currentLakeRecordIterator
+ : null;
+ }
+
+ private void updateCurrentIterator() throws IOException {
+ if (lakeSplits.size() <= nextLakeSplitIndex) {
+ currentLakeRecordIterator = null;
+ } else {
+ int currentLakeSplitIndex = nextLakeSplitIndex;
+ LakeSplit split = lakeSplits.get(currentLakeSplitIndex);
+ CloseableIterator<LogRecord> lakeRecords =
+ lakeSource
+
.createRecordReader((LakeSource.ReaderContext<LakeSplit>) () -> split)
+ .read();
+ currentLakeRecordIterator =
+ new IndexedLakeSplitRecordIterator(lakeRecords,
currentLakeSplitIndex);
+ nextLakeSplitIndex += 1;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (currentLakeRecordIterator != null) {
+ currentLakeRecordIterator.close();
+ currentLakeRecordIterator = null;
+ }
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
index 0c2d60dbb..74d549f22 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
@@ -32,31 +32,27 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
public static final byte LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND = -2;
- // may be null when no snapshot data for the bucket
+ // may be null when no lake snapshot data for the bucket
@Nullable private final List<LakeSplit> lakeSnapshotSplits;
- /** The records to skip when reading the splits. */
- private long recordOffset = 0;
- // TODO: Support skip read file by record fileOffset
+ /**
+ * The index of the current split in the lake snapshot splits to be read,
enable to skip read
+ * lake split via this lake split index.
+ */
+ private int currentLakeSplitIndex;
+ /** The records to skip when reading a split. */
+ private long recordOffset;
- private final long startingOffset;
+ private long startingOffset;
private final long stoppingOffset;
- public LakeSnapshotAndFlussLogSplit(
- TableBucket tableBucket,
- @Nullable List<LakeSplit> snapshotSplits,
- long startingOffset,
- long stoppingOffset) {
- this(tableBucket, null, snapshotSplits, startingOffset,
stoppingOffset, 0);
- }
-
public LakeSnapshotAndFlussLogSplit(
TableBucket tableBucket,
@Nullable String partitionName,
@Nullable List<LakeSplit> snapshotSplits,
long startingOffset,
long stoppingOffset) {
- this(tableBucket, partitionName, snapshotSplits, startingOffset,
stoppingOffset, 0);
+ this(tableBucket, partitionName, snapshotSplits, startingOffset,
stoppingOffset, 0, 0);
}
public LakeSnapshotAndFlussLogSplit(
@@ -65,12 +61,14 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
@Nullable List<LakeSplit> snapshotSplits,
long startingOffset,
long stoppingOffset,
- long recordsToSkip) {
+ long recordsToSkip,
+ int currentLakeSplitIndex) {
super(tableBucket, partitionName);
this.lakeSnapshotSplits = snapshotSplits;
this.startingOffset = startingOffset;
this.stoppingOffset = stoppingOffset;
this.recordOffset = recordsToSkip;
+ this.currentLakeSplitIndex = currentLakeSplitIndex;
}
public LakeSnapshotAndFlussLogSplit updateWithRecordsToSkip(long
recordsToSkip) {
@@ -78,6 +76,16 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
return this;
}
+ public LakeSnapshotAndFlussLogSplit updateWithCurrentLakeSplitIndex(int
currentLakeSplitIndex) {
+ this.currentLakeSplitIndex = currentLakeSplitIndex;
+ return this;
+ }
+
+ public LakeSnapshotAndFlussLogSplit updateWithStartingOffset(long
startingOffset) {
+ this.startingOffset = startingOffset;
+ return this;
+ }
+
public long getRecordsToSkip() {
return recordOffset;
}
@@ -95,6 +103,10 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
return true;
}
+ public boolean isStreaming() {
+ return !getStoppingOffset().isPresent();
+ }
+
protected byte splitKind() {
return LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
}
@@ -104,10 +116,15 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
return toSplitId("lake-hybrid-snapshot-log-", tableBucket);
}
+ @Nullable
public List<LakeSplit> getLakeSplits() {
return lakeSnapshotSplits;
}
+ public int getCurrentLakeSplitIndex() {
+ return currentLakeSplitIndex;
+ }
+
@Override
public String toString() {
return "LakeSnapshotAndFlussLogSplit{"
@@ -115,6 +132,8 @@ public class LakeSnapshotAndFlussLogSplit extends
SourceSplitBase {
+ lakeSnapshotSplits
+ ", recordOffset="
+ recordOffset
+ + ", currentLakeSplitIndex="
+ + currentLakeSplitIndex
+ ", startingOffset="
+ startingOffset
+ ", stoppingOffset="
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
index c7ddf59d1..d46b6575f 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
@@ -27,19 +27,33 @@ public class LakeSnapshotAndFlussLogSplitState extends
SourceSplitState {
private long recordsToSkip;
private final LakeSnapshotAndFlussLogSplit split;
+ private int currentLakeSplitIndex;
+ private long nextLogOffset;
public LakeSnapshotAndFlussLogSplitState(LakeSnapshotAndFlussLogSplit
split) {
super(split);
this.recordsToSkip = split.getRecordsToSkip();
this.split = split;
+ this.currentLakeSplitIndex = split.getCurrentLakeSplitIndex();
+ this.nextLogOffset = split.getStartingOffset();
}
public void setRecordsToSkip(long recordsToSkip) {
this.recordsToSkip = recordsToSkip;
}
+ public void setCurrentLakeSplitIndex(int currentLakeSplitIndex) {
+ this.currentLakeSplitIndex = currentLakeSplitIndex;
+ }
+
+ public void setNextLogOffset(long nextOffset) {
+ this.nextLogOffset = nextOffset;
+ }
+
@Override
public SourceSplitBase toSourceSplit() {
- return split.updateWithRecordsToSkip(recordsToSkip);
+ return split.updateWithCurrentLakeSplitIndex(currentLakeSplitIndex)
+ .updateWithRecordsToSkip(recordsToSkip)
+ .updateWithStartingOffset(nextLogOffset);
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
index 24a1a11bd..49f0d5ff7 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.fluss.flink.source.split.LogSplit.NO_STOPPING_OFFSET;
+
/**
* An implementation of {@link OffsetsInitializer} which does not initialize
anything.
*
@@ -37,6 +39,6 @@ public class NoStoppingOffsetsInitializer implements
OffsetsInitializer {
@Nullable String partitionName,
Collection<Integer> buckets,
OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever) {
- return buckets.stream().collect(Collectors.toMap(x -> x, x ->
Long.MAX_VALUE));
+ return buckets.stream().collect(Collectors.toMap(x -> x, x ->
NO_STOPPING_OFFSET));
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java
index cd8d63405..957a307cd 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.source.reader;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
+import org.apache.fluss.flink.lake.reader.IndexedLakeSplitRecordIterator;
import org.apache.fluss.flink.source.split.SnapshotSplit;
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.utils.CloseableIterator;
@@ -135,9 +136,14 @@ public class BoundedSplitReader implements AutoCloseable {
private static class ScanRecordBatch implements
CloseableIterator<ScanRecord> {
private final CloseableIterator<InternalRow> rowIterator;
+ private int currentSplitIndex;
public ScanRecordBatch(CloseableIterator<InternalRow> rowIterator) {
this.rowIterator = rowIterator;
+ if (rowIterator instanceof IndexedLakeSplitRecordIterator) {
+ currentSplitIndex =
+ ((IndexedLakeSplitRecordIterator)
rowIterator).getCurrentLakeSplitIndex();
+ }
}
@Override
@@ -154,6 +160,10 @@ public class BoundedSplitReader implements AutoCloseable {
public void close() {
rowIterator.close();
}
+
+ public int getCurrentSplitIndex() {
+ return currentSplitIndex;
+ }
}
private class RecordAndPosBatch implements CloseableIterator<RecordAndPos>
{
@@ -163,7 +173,12 @@ public class BoundedSplitReader implements AutoCloseable {
RecordAndPosBatch replace(CloseableIterator<ScanRecord> records) {
this.records = records;
- recordAndPosition.setRecord(null, NO_READ_RECORDS_COUNT);
+ if (records instanceof ScanRecordBatch) {
+ int currentSplitIndex = ((ScanRecordBatch)
records).getCurrentSplitIndex();
+ recordAndPosition.setRecord(null, NO_READ_RECORDS_COUNT,
currentSplitIndex);
+ } else {
+ recordAndPosition.setRecord(null, NO_READ_RECORDS_COUNT);
+ }
return this;
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index b809feca5..fa0cb9e1b 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -27,6 +27,7 @@ import org.apache.fluss.client.table.scanner.log.ScanRecords;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.flink.lake.LakeSplitReaderGenerator;
+import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
@@ -208,6 +209,17 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
subscribeLog(sourceSplitBase,
sourceSplitBase.asLogSplit().getStartingOffset());
} else if (sourceSplitBase.isLakeSplit()) {
getLakeSplitReader().addSplit(sourceSplitBase, boundedSplits);
+ if (sourceSplitBase instanceof LakeSnapshotAndFlussLogSplit) {
+ LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+ (LakeSnapshotAndFlussLogSplit) sourceSplitBase;
+ if (lakeSnapshotAndFlussLogSplit.isStreaming()) {
+ // is streaming split which has no stopping offset, we
need also subscribe
+ // change log
+ subscribeLog(
+ lakeSnapshotAndFlussLogSplit,
+
lakeSnapshotAndFlussLogSplit.getStartingOffset());
+ }
+ }
} else {
throw new UnsupportedOperationException(
String.format(
@@ -503,7 +515,11 @@ public class FlinkSourceSplitReader implements
SplitReader<RecordAndPos, SourceS
private FlinkRecordsWithSplitIds finishCurrentBoundedSplit() throws
IOException {
Set<String> finishedSplits =
currentBoundedSplit instanceof HybridSnapshotLogSplit
- // is hybrid split, not to finish this split
+ || (currentBoundedSplit instanceof
LakeSnapshotAndFlussLogSplit
+ && ((LakeSnapshotAndFlussLogSplit)
currentBoundedSplit)
+ .isStreaming())
+ // is hybrid split, or is lakeAndFlussLog split in
streaming mode,
+ // not to finish this split
// since it remains log to read
? Collections.emptySet()
: Collections.singleton(currentBoundedSplit.splitId());
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/MutableRecordAndPos.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/MutableRecordAndPos.java
index a6edff4ec..1358730dc 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/MutableRecordAndPos.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/MutableRecordAndPos.java
@@ -28,11 +28,16 @@ import org.apache.fluss.client.table.scanner.ScanRecord;
public class MutableRecordAndPos extends RecordAndPos {
public MutableRecordAndPos() {
- super(null, NO_READ_RECORDS_COUNT);
+ super(null, NO_READ_RECORDS_COUNT, DEFAULT_SPLIT_INDEX);
}
public void setRecord(ScanRecord scanRecord, long readRecordsCount) {
+ setRecord(scanRecord, readRecordsCount, DEFAULT_SPLIT_INDEX);
+ }
+
+ public void setRecord(ScanRecord scanRecord, long readRecordsCount, int
currentIteratorIndex) {
this.scanRecord = scanRecord;
this.readRecordsCount = readRecordsCount;
+ this.currentSplitIndex = currentIteratorIndex;
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/RecordAndPos.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/RecordAndPos.java
index 3f48b156d..1cdf93dce 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/RecordAndPos.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/RecordAndPos.java
@@ -23,9 +23,14 @@ import
org.apache.fluss.flink.source.emitter.FlinkRecordEmitter;
import java.util.Objects;
/**
- * A record wrapping a Fluss {@link ScanRecord} and the {@code
readRecordsCount} when the record is
- * from reading snapshot. When the record is from reading log, {@code
readRecordsCount} will always
- * be {@link #NO_READ_RECORDS_COUNT}.
+ * A record wrapping a Fluss {@link ScanRecord}, the {@code readRecordsCount}
when the record is
+ * from reading snapshot, the code {@code currentSplitIndex} where the record
is from when the Flink
+ * source split read by split reader contains multiple splits(splittable
unit), like {@link
+ * org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit} contains
multiple {@link
+ * org.apache.fluss.lake.source.LakeSplit}s.
+ *
+ * <p>When the record is from reading log, {@code readRecordsCount} will
always be {@link
+ * #NO_READ_RECORDS_COUNT}.
*
* <p>The {@code readRecordsCount} defines the point in the snapshot reader
AFTER the record. Record
* processing and updating checkpointed state happens atomically. The position
points to where the
@@ -39,25 +44,38 @@ import java.util.Objects;
public class RecordAndPos {
public static final long NO_READ_RECORDS_COUNT = -1;
+ protected static final int DEFAULT_SPLIT_INDEX = 0;
protected ScanRecord scanRecord;
// the read records count include this record when read this record
protected long readRecordsCount;
+ // the index for the current split that the record is from
+ protected int currentSplitIndex;
+
public RecordAndPos(ScanRecord scanRecord) {
- this(scanRecord, NO_READ_RECORDS_COUNT);
+ this(scanRecord, NO_READ_RECORDS_COUNT, DEFAULT_SPLIT_INDEX);
}
public RecordAndPos(ScanRecord scanRecord, long readRecordsCount) {
+ this(scanRecord, readRecordsCount, DEFAULT_SPLIT_INDEX);
+ }
+
+ public RecordAndPos(ScanRecord scanRecord, long readRecordsCount, int
currentSplitIndex) {
this.scanRecord = scanRecord;
this.readRecordsCount = readRecordsCount;
+ this.currentSplitIndex = currentSplitIndex;
}
public long readRecordsCount() {
return readRecordsCount;
}
+ public int getCurrentSplitIndex() {
+ return currentSplitIndex;
+ }
+
public ScanRecord record() {
return scanRecord;
}
@@ -72,12 +90,13 @@ public class RecordAndPos {
}
RecordAndPos that = (RecordAndPos) o;
return readRecordsCount == that.readRecordsCount
+ && currentSplitIndex == that.currentSplitIndex
&& Objects.equals(scanRecord, that.scanRecord);
}
@Override
public int hashCode() {
- return Objects.hash(scanRecord, readRecordsCount);
+ return Objects.hash(scanRecord, readRecordsCount, currentSplitIndex);
}
@Override
@@ -87,6 +106,8 @@ public class RecordAndPos {
+ scanRecord
+ ", readRecordsCount="
+ readRecordsCount
+ + ", currentSplitIndex="
+ + currentSplitIndex
+ '}';
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index fa52a86cd..d508fb49b 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -25,6 +25,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
+import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
import org.apache.fluss.flink.tiering.source.TieringSource;
import org.apache.fluss.lake.committer.BucketOffset;
@@ -43,6 +44,7 @@ import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -124,6 +126,17 @@ public class TieringCommitOperator<WriteResult,
Committable>
admin = connection.getAdmin();
}
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ int attemptNumber = getRuntimeContext().getAttemptNumber();
+ if (attemptNumber > 0) {
+ // attempt number is greater than zero, the job must failover
+ operatorEventGateway.sendEventToCoordinator(
+ new SourceEventWrapper(new TieringRestoreEvent()));
+ }
+ }
+
@Override
public void
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord)
throws Exception {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
new file mode 100644
index 000000000..77a48f9e7
--- /dev/null
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+/** SourceEvent used to represent tiering is restoring. */
+public class TieringRestoreEvent implements SourceEvent {
+ private static final long serialVersionUID = 1L;
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 7d6a3b31a..b0b58a092 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -26,6 +26,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
+import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
import
org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -217,6 +218,15 @@ public class TieringSourceEnumerator
}
}
+ if (sourceEvent instanceof TieringRestoreEvent) {
+ LOG.info(
+ "Receiving tiering restore event, mark current tiering
table epoch {} as failed.",
+ tieringTableEpochs);
+ // we need to make all as failed
+ failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
+ tieringTableEpochs.clear();
+ }
+
if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
// call one round of heartbeat to notify table has been finished
or failed
this.context.callAsync(
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
index 0f1796a78..43bcf5fce 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
@@ -56,8 +56,15 @@ public class LakeSourceUtils {
Configuration.fromMap(properties)
.get(ConfigOptions.TABLE_DATALAKE_FORMAT)
.toString();
- LakeStoragePlugin lakeStoragePlugin =
- LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
+ LakeStoragePlugin lakeStoragePlugin;
+ try {
+ lakeStoragePlugin =
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
+ } catch (UnsupportedOperationException e) {
+ LOG.info(
+ "No LakeStoragePlugin can be found for datalake format:
{}, return null to disable reading from lake source.",
+ dataLake);
+ return null;
+ }
LakeStorage lakeStorage =
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
try {
return (LakeSource<LakeSplit>)
lakeStorage.createLakeSource(tablePath);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index 15bb5d7d0..bd6081417 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -54,7 +54,7 @@ class LakeSplitSerializerTest {
private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer =
new TestSimpleVersionedSerializer();
- private TableBucket tableBucket = new TableBucket(0, 1L, 0);
+ private final TableBucket tableBucket = new TableBucket(0, 1L, 0);
private final LakeSplitSerializer serializer = new
LakeSplitSerializer(sourceSplitSerializer);
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/RecordAndPosTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/RecordAndPosTest.java
index 76777fa0c..3a975b3e4 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/RecordAndPosTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/RecordAndPosTest.java
@@ -49,8 +49,10 @@ class RecordAndPosTest {
assertThat(recordAndPos2).isNotEqualTo(recordAndPos);
assertThat(recordAndPos.toString())
- .isEqualTo("RecordAndPos{scanRecord=+A(1,null,3)@0,
readRecordsCount=-1}");
+ .isEqualTo(
+ "RecordAndPos{scanRecord=+A(1,null,3)@0,
readRecordsCount=-1, currentSplitIndex=0}");
assertThat(recordAndPos2.toString())
- .isEqualTo("RecordAndPos{scanRecord=+A(1,null,3)@0,
readRecordsCount=3}");
+ .isEqualTo(
+ "RecordAndPos{scanRecord=+A(1,null,3)@0,
readRecordsCount=3, currentSplitIndex=0}");
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 87c7e14c1..5946d906c 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -54,6 +54,17 @@ public class FlinkRowAssertionsUtils {
.containsExactlyInAnyOrderElementsOf(expected);
}
+ public static void assertResultsExactOrder(
+ CloseableIterator<Row> iterator, List<Row> expected, boolean
closeIterator) {
+ List<String> actual = collectRowsWithTimeout(iterator,
expected.size(), closeIterator);
+ assertThat(actual)
+ .as(
+ "Expected %d records but got %d after waiting. Actual
results: %s",
+ expected.size(), actual.size(), actual)
+ .containsExactlyElementsOf(
+
expected.stream().map(Row::toString).collect(Collectors.toList()));
+ }
+
public static void assertQueryResultExactOrder(
TableEnvironment env, String query, List<String> expected) throws
Exception {
try (CloseableIterator<Row> rowIter = env.executeSql(query).collect())
{
@@ -100,7 +111,7 @@ public class FlinkRowAssertionsUtils {
for (int i = 0; i < expectedCount; i++) {
// Wait for next record with timeout
if (!waitForNextWithTimeout(
- iterator, deadlineTimeMs -
System.currentTimeMillis())) {
+ iterator, Math.max(deadlineTimeMs -
System.currentTimeMillis(), 1_000))) {
throw timeoutError(
System.currentTimeMillis() - startTimeMs,
expectedCount, actual.size());
}
@@ -153,9 +164,11 @@ public class FlinkRowAssertionsUtils {
private static boolean waitForNextWithTimeout(
CloseableIterator<Row> iterator, long maxWaitTime) {
CompletableFuture<Boolean> future =
CompletableFuture.supplyAsync(iterator::hasNext);
+ System.out.println("Waiting for " + maxWaitTime + " ms to finish.");
try {
return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
+ System.err.println("Timeout waiting for " + maxWaitTime + " ms to
finish.");
future.cancel(true);
return false;
} catch (Exception e) {
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 038d32a12..4bf3e97cd 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -33,6 +33,8 @@ import org.apache.fluss.types.DataTypes;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
@@ -50,6 +52,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
+import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -119,9 +123,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
List<Row> expectedRows = new ArrayList<>();
if (isPartitioned) {
- List<String> partitions = Arrays.asList("2025", "2026");
-
- for (String partition : partitions) {
+ for (String partition : waitUntilPartitions(t1).values()) {
expectedRows.add(
Row.of(
false,
@@ -266,9 +268,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
expectedRows = new ArrayList<>();
if (isPartitioned) {
- List<String> partitions = Arrays.asList("2025", "2026");
-
- for (String partition : partitions) {
+ for (String partition : waitUntilPartitions(t1).values()) {
expectedRows.add(
Row.of(
false,
@@ -373,6 +373,225 @@ class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
assertThat(projectRows2.toString()).isEqualTo(sortedRows(expectedProjectRows2).toString());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName =
+ "stream_pk_table_full" + (isPartitioned ? "_partitioned" :
"_non_partitioned");
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ // create table & write initial data
+ long tableId =
+ preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+
+ // wait unit records have been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // check the status of replica after synced
+ assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+
+ // will read paimon snapshot, should only +I since no change log
+ List<Row> expectedRows = new ArrayList<>();
+ if (isPartitioned) {
+ for (String partition : waitUntilPartitions(t1).values()) {
+ expectedRows.add(
+ Row.of(
+ false,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.0d,
+ "string",
+ Decimal.fromUnscaledLong(9, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(10), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273182L),
+ TimestampLtz.fromEpochMillis(1698235273182L,
5000),
+ TimestampNtz.fromMillis(1698235273183L),
+ TimestampNtz.fromMillis(1698235273183L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ expectedRows.add(
+ Row.of(
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L,
5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ }
+ } else {
+ expectedRows =
+ Arrays.asList(
+ Row.of(
+ false,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.0d,
+ "string",
+ Decimal.fromUnscaledLong(9, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(10), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273182L),
+
TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+ TimestampNtz.fromMillis(1698235273183L),
+ TimestampNtz.fromMillis(1698235273183L,
6000),
+ new byte[] {1, 2, 3, 4},
+ null),
+ Row.of(
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(100), 20, 0),
+
TimestampLtz.fromEpochMillis(1698235273200L),
+
TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L,
6000),
+ new byte[] {1, 2, 3, 4},
+ null));
+ }
+
+ String query = "select * from " + tableName;
+ CloseableIterator<Row> actual = streamTEnv.executeSql(query).collect();
+ assertRowResultsIgnoreOrder(actual, expectedRows, false);
+
+ // stop lake tiering service
+ jobClient.cancel().get();
+
+ // write a row again
+ if (isPartitioned) {
+ Map<Long, String> partitionNameById = waitUntilPartitions(t1);
+ for (String partition : partitionNameById.values()) {
+ writeFullTypeRow(t1, partition);
+ }
+ } else {
+ writeFullTypeRow(t1, null);
+ }
+
+ // should generate -U & +U
+ List<Row> expectedRows2 = new ArrayList<>();
+ if (isPartitioned) {
+ for (String partition : waitUntilPartitions(t1).values()) {
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_BEFORE,
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L,
5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L,
7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ partition));
+ }
+ } else {
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_BEFORE,
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ null));
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new
java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ null));
+ }
+
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, expectedRows2, true);
+ } else {
+ assertResultsExactOrder(actual, expectedRows2, true);
+ }
+
+ // query again
+ actual = streamTEnv.executeSql(query).collect();
+ List<Row> totalExpectedRows = new ArrayList<>(expectedRows);
+ totalExpectedRows.addAll(expectedRows2);
+
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, totalExpectedRows, true);
+ } else {
+ assertResultsExactOrder(actual, totalExpectedRows, true);
+ }
+ }
+
private List<Row> sortedRows(List<Row> rows) {
rows.sort(Comparator.comparing(Row::toString));
return rows;
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index aaa08c07d..f2baea535 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -392,6 +392,7 @@
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>
<exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude>
+
<exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude>
<exclude>
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
</exclude>