This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c8f78180 [flink] Union read support primary key table in streaming 
mode (#1613)
5c8f78180 is described below

commit 5c8f78180ed16f08dc020b2e95847a206c7c533b
Author: CaoZhen <[email protected]>
AuthorDate: Fri Sep 5 16:00:46 2025 +0800

    [flink] Union read support primary key table in streaming mode (#1613)
---
 .../fluss/flink/lake/LakeRecordRecordEmitter.java  |  22 +-
 .../fluss/flink/lake/LakeSplitGenerator.java       |   4 +-
 .../fluss/flink/lake/LakeSplitReaderGenerator.java |  32 ++-
 .../fluss/flink/lake/LakeSplitSerializer.java      |   5 +-
 .../reader/IndexedLakeSplitRecordIterator.java     |  70 +++++++
 .../reader/SeekableLakeSnapshotSplitScanner.java   | 107 ++++++++++
 .../lake/split/LakeSnapshotAndFlussLogSplit.java   |  49 +++--
 .../state/LakeSnapshotAndFlussLogSplitState.java   |  16 +-
 .../initializer/NoStoppingOffsetsInitializer.java  |   4 +-
 .../flink/source/reader/BoundedSplitReader.java    |  17 +-
 .../source/reader/FlinkSourceSplitReader.java      |  18 +-
 .../flink/source/reader/MutableRecordAndPos.java   |   7 +-
 .../fluss/flink/source/reader/RecordAndPos.java    |  31 ++-
 .../tiering/committer/TieringCommitOperator.java   |  13 ++
 .../flink/tiering/event/TieringRestoreEvent.java   |  26 +++
 .../source/enumerator/TieringSourceEnumerator.java |  10 +
 .../apache/fluss/flink/utils/LakeSourceUtils.java  |  11 +-
 .../fluss/flink/lake/LakeSplitSerializerTest.java  |   2 +-
 .../flink/source/reader/RecordAndPosTest.java      |   6 +-
 .../source/testutils/FlinkRowAssertionsUtils.java  |  15 +-
 .../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 231 ++++++++++++++++++++-
 fluss-test-coverage/pom.xml                        |   1 +
 22 files changed, 648 insertions(+), 49 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
index 147043fdc..031444501 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeRecordRecordEmitter.java
@@ -44,8 +44,26 @@ public class LakeRecordRecordEmitter<OUT> {
             ((LakeSnapshotSplitState) 
splitState).setRecordsToSkip(recordAndPos.readRecordsCount());
             sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
         } else if (splitState instanceof LakeSnapshotAndFlussLogSplitState) {
-            ((LakeSnapshotAndFlussLogSplitState) splitState)
-                    .setRecordsToSkip(recordAndPos.readRecordsCount());
+            LakeSnapshotAndFlussLogSplitState 
lakeSnapshotAndFlussLogSplitState =
+                    (LakeSnapshotAndFlussLogSplitState) splitState;
+
+            // set current split index
+            lakeSnapshotAndFlussLogSplitState.setCurrentLakeSplitIndex(
+                    recordAndPos.getCurrentSplitIndex());
+
+            // set records to skip to state
+            if (recordAndPos.readRecordsCount() > 0) {
+                
lakeSnapshotAndFlussLogSplitState.setRecordsToSkip(recordAndPos.readRecordsCount());
+            }
+
+            ScanRecord scanRecord = recordAndPos.record();
+            // todo: may need a state to mark snapshot phase is finished
+            // just like what we do for HybridSnapshotLogSplitState
+            if (scanRecord.logOffset() >= 0) {
+                // record is with a valid offset, means it's in incremental 
phase,
+                // update the log offset
+                
lakeSnapshotAndFlussLogSplitState.setNextLogOffset(scanRecord.logOffset() + 1);
+            }
             sourceOutputFunc.accept(recordAndPos.record(), sourceOutput);
         } else {
             throw new UnsupportedOperationException(
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
index e420de630..cd0926294 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
@@ -46,6 +46,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.fluss.client.table.scanner.log.LogScanner.EARLIEST_OFFSET;
+import static org.apache.fluss.flink.source.split.LogSplit.NO_STOPPING_OFFSET;
 import static 
org.apache.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
 
 /** A generator for lake splits. */
@@ -240,7 +241,8 @@ public class LakeSplitGenerator {
                                     tableBucket, partitionName, 
EARLIEST_OFFSET, stoppingOffset));
                 } else {
                     // need to read remain fluss log
-                    if (snapshotLogOffset < stoppingOffset) {
+                    if (stoppingOffset == NO_STOPPING_OFFSET
+                            || snapshotLogOffset < stoppingOffset) {
                         splits.add(
                                 new LogSplit(
                                         tableBucket,
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
index 36958736a..70734a490 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitReaderGenerator.java
@@ -18,8 +18,10 @@
 package org.apache.fluss.flink.lake;
 
 import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.scanner.batch.BatchScanner;
 import org.apache.fluss.flink.lake.reader.LakeSnapshotAndLogSplitScanner;
 import org.apache.fluss.flink.lake.reader.LakeSnapshotScanner;
+import org.apache.fluss.flink.lake.reader.SeekableLakeSnapshotSplitScanner;
 import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
 import org.apache.fluss.flink.source.reader.BoundedSplitReader;
@@ -29,6 +31,7 @@ import org.apache.fluss.lake.source.LakeSplit;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
 import java.util.Queue;
 
 /** A generator to generate reader for lake split. */
@@ -67,17 +70,30 @@ public class LakeSplitReaderGenerator {
             return new BoundedSplitReader(
                     lakeSnapshotScanner, 
lakeSnapshotSplit.getRecordsToSplit());
         } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
-            LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
-                    (LakeSnapshotAndFlussLogSplit) split;
-            LakeSnapshotAndLogSplitScanner lakeSnapshotAndLogSplitScanner =
-                    new LakeSnapshotAndLogSplitScanner(
-                            table, lakeSource, lakeSnapshotAndFlussLogSplit, 
projectedFields);
-            return new BoundedSplitReader(
-                    lakeSnapshotAndLogSplitScanner,
-                    lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
+            LakeSnapshotAndFlussLogSplit lakeSplit = 
(LakeSnapshotAndFlussLogSplit) split;
+            return new BoundedSplitReader(getBatchScanner(lakeSplit), 
lakeSplit.getRecordsToSkip());
         } else {
             throw new UnsupportedOperationException(
                     String.format("The split type of %s is not supported.", 
split.getClass()));
         }
     }
+
+    private BatchScanner getBatchScanner(LakeSnapshotAndFlussLogSplit 
lakeSplit) {
+        BatchScanner lakeBatchScanner;
+        if (lakeSplit.isStreaming()) {
+            lakeBatchScanner =
+                    new SeekableLakeSnapshotSplitScanner(
+                            lakeSource,
+                            lakeSplit.getLakeSplits() == null
+                                    ? Collections.emptyList()
+                                    : lakeSplit.getLakeSplits(),
+                            lakeSplit.getCurrentLakeSplitIndex());
+
+        } else {
+            lakeBatchScanner =
+                    new LakeSnapshotAndLogSplitScanner(
+                            table, lakeSource, lakeSplit, projectedFields);
+        }
+        return lakeBatchScanner;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
index d4d200ff9..af2eeb2ad 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
@@ -77,6 +77,7 @@ public class LakeSplitSerializer {
                             .getStoppingOffset()
                             .orElse(LogSplit.NO_STOPPING_OFFSET));
             out.writeLong(lakeSnapshotAndFlussLogSplit.getRecordsToSkip());
+            
out.writeInt(lakeSnapshotAndFlussLogSplit.getCurrentLakeSplitIndex());
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported split type: " + split.getClass().getName());
@@ -113,13 +114,15 @@ public class LakeSplitSerializer {
             long startingOffset = input.readLong();
             long stoppingOffset = input.readLong();
             long recordsToSkip = input.readLong();
+            int splitIndex = input.readInt();
             return new LakeSnapshotAndFlussLogSplit(
                     tableBucket,
                     partition,
                     lakeSplits,
                     startingOffset,
                     stoppingOffset,
-                    recordsToSkip);
+                    recordsToSkip,
+                    splitIndex);
         } else {
             throw new UnsupportedOperationException("Unsupported split kind: " 
+ splitKind);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java
new file mode 100644
index 000000000..4af2b5add
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/IndexedLakeSplitRecordIterator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.lake.reader;
+
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+/**
+ * An iterator wrapper that converts LogRecord objects to InternalRow objects 
while tracking the
+ * current LakeSplit index being processed.
+ *
+ * <p>This class serves as an adapter between the underlying LogRecord 
iterator and the InternalRow
+ * interface expected by consumers. It maintains reference to the specific 
LakeSplit index that is
+ * currently being iterated.
+ *
+ * <p>Primary responsibilities:
+ *
+ * <ul>
+ *   <li>Wraps a LogRecord iterator and exposes InternalRow objects
+ *   <li>Preserves the index of the LakeSplit being processed
+ *   <li>Provides clean resource management through Closeable interface
+ *   <li>Maintains iterator semantics for sequential data access
+ * </ul>
+ */
+public class IndexedLakeSplitRecordIterator implements 
CloseableIterator<InternalRow> {
+    private final CloseableIterator<LogRecord> logRecordIterators;
+    private final int currentLakeSplitIndex;
+
+    public IndexedLakeSplitRecordIterator(
+            CloseableIterator<LogRecord> logRecordIterators, int 
currentLakeSplitIndex) {
+        this.logRecordIterators = logRecordIterators;
+        this.currentLakeSplitIndex = currentLakeSplitIndex;
+    }
+
+    public int getCurrentLakeSplitIndex() {
+        return currentLakeSplitIndex;
+    }
+
+    @Override
+    public void close() {
+        logRecordIterators.close();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return logRecordIterators.hasNext();
+    }
+
+    @Override
+    public InternalRow next() {
+        return logRecordIterators.next().getRow();
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SeekableLakeSnapshotSplitScanner.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SeekableLakeSnapshotSplitScanner.java
new file mode 100644
index 000000000..b0df3c174
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/reader/SeekableLakeSnapshotSplitScanner.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.lake.reader;
+
+import org.apache.fluss.client.table.scanner.batch.BatchScanner;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.LakeSplit;
+import org.apache.fluss.record.LogRecord;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.utils.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+
+/**
+ * A scanner that supports seeking to a specific LakeSplit and reading from 
that point.
+ *
+ * <p>This scanner enables direct positioning to any LakeSplit in the list 
using its index, and then
+ * sequentially reads data starting from that specific split. It provides 
fine-grained control over
+ * where to begin the scanning process.
+ *
+ * <p>Key capabilities:
+ *
+ * <ul>
+ *   <li>Direct seeking to any LakeSplit by index
+ *   <li>Sequential reading starting from the sought split
+ *   <li>Precise positioning within the split collection
+ *   <li>Resumable scanning from arbitrary positions
+ * </ul>
+ */
+public class SeekableLakeSnapshotSplitScanner implements BatchScanner {
+
+    private final List<LakeSplit> lakeSplits;
+    private int nextLakeSplitIndex;
+    private final LakeSource<LakeSplit> lakeSource;
+    private CloseableIterator<InternalRow> currentLakeRecordIterator;
+
+    public SeekableLakeSnapshotSplitScanner(
+            LakeSource<LakeSplit> lakeSource,
+            List<LakeSplit> lakeSplits,
+            int currentLakeSplitIndex) {
+        // add lake splits
+        this.lakeSplits = lakeSplits;
+        this.nextLakeSplitIndex = currentLakeSplitIndex;
+        this.lakeSource = lakeSource;
+    }
+
+    @Nullable
+    @Override
+    public CloseableIterator<InternalRow> pollBatch(Duration timeout) throws 
IOException {
+        if (currentLakeRecordIterator == null) {
+            updateCurrentIterator();
+        }
+
+        // has no next record in currentIterator, update currentIterator
+        if (currentLakeRecordIterator != null && 
!currentLakeRecordIterator.hasNext()) {
+            updateCurrentIterator();
+        }
+
+        return currentLakeRecordIterator != null && 
currentLakeRecordIterator.hasNext()
+                ? currentLakeRecordIterator
+                : null;
+    }
+
+    private void updateCurrentIterator() throws IOException {
+        if (lakeSplits.size() <= nextLakeSplitIndex) {
+            currentLakeRecordIterator = null;
+        } else {
+            int currentLakeSplitIndex = nextLakeSplitIndex;
+            LakeSplit split = lakeSplits.get(currentLakeSplitIndex);
+            CloseableIterator<LogRecord> lakeRecords =
+                    lakeSource
+                            
.createRecordReader((LakeSource.ReaderContext<LakeSplit>) () -> split)
+                            .read();
+            currentLakeRecordIterator =
+                    new IndexedLakeSplitRecordIterator(lakeRecords, 
currentLakeSplitIndex);
+            nextLakeSplitIndex += 1;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (currentLakeRecordIterator != null) {
+            currentLakeRecordIterator.close();
+            currentLakeRecordIterator = null;
+        }
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
index 0c2d60dbb..74d549f22 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
@@ -32,31 +32,27 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
 
     public static final byte LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND = -2;
 
-    // may be null when no snapshot data for the bucket
+    // may be null when no lake snapshot data for the bucket
     @Nullable private final List<LakeSplit> lakeSnapshotSplits;
 
-    /** The records to skip when reading the splits. */
-    private long recordOffset = 0;
-    // TODO: Support skip read file by record fileOffset
+    /**
+     * The index of the current split in the lake snapshot splits to be read, 
enable to skip read
+     * lake split via this lake split index.
+     */
+    private int currentLakeSplitIndex;
+    /** The records to skip when reading a split. */
+    private long recordOffset;
 
-    private final long startingOffset;
+    private long startingOffset;
     private final long stoppingOffset;
 
-    public LakeSnapshotAndFlussLogSplit(
-            TableBucket tableBucket,
-            @Nullable List<LakeSplit> snapshotSplits,
-            long startingOffset,
-            long stoppingOffset) {
-        this(tableBucket, null, snapshotSplits, startingOffset, 
stoppingOffset, 0);
-    }
-
     public LakeSnapshotAndFlussLogSplit(
             TableBucket tableBucket,
             @Nullable String partitionName,
             @Nullable List<LakeSplit> snapshotSplits,
             long startingOffset,
             long stoppingOffset) {
-        this(tableBucket, partitionName, snapshotSplits, startingOffset, 
stoppingOffset, 0);
+        this(tableBucket, partitionName, snapshotSplits, startingOffset, 
stoppingOffset, 0, 0);
     }
 
     public LakeSnapshotAndFlussLogSplit(
@@ -65,12 +61,14 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
             @Nullable List<LakeSplit> snapshotSplits,
             long startingOffset,
             long stoppingOffset,
-            long recordsToSkip) {
+            long recordsToSkip,
+            int currentLakeSplitIndex) {
         super(tableBucket, partitionName);
         this.lakeSnapshotSplits = snapshotSplits;
         this.startingOffset = startingOffset;
         this.stoppingOffset = stoppingOffset;
         this.recordOffset = recordsToSkip;
+        this.currentLakeSplitIndex = currentLakeSplitIndex;
     }
 
     public LakeSnapshotAndFlussLogSplit updateWithRecordsToSkip(long 
recordsToSkip) {
@@ -78,6 +76,16 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
         return this;
     }
 
+    public LakeSnapshotAndFlussLogSplit updateWithCurrentLakeSplitIndex(int 
currentLakeSplitIndex) {
+        this.currentLakeSplitIndex = currentLakeSplitIndex;
+        return this;
+    }
+
+    public LakeSnapshotAndFlussLogSplit updateWithStartingOffset(long 
startingOffset) {
+        this.startingOffset = startingOffset;
+        return this;
+    }
+
     public long getRecordsToSkip() {
         return recordOffset;
     }
@@ -95,6 +103,10 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
         return true;
     }
 
+    public boolean isStreaming() {
+        return !getStoppingOffset().isPresent();
+    }
+
     protected byte splitKind() {
         return LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND;
     }
@@ -104,10 +116,15 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
         return toSplitId("lake-hybrid-snapshot-log-", tableBucket);
     }
 
+    @Nullable
     public List<LakeSplit> getLakeSplits() {
         return lakeSnapshotSplits;
     }
 
+    public int getCurrentLakeSplitIndex() {
+        return currentLakeSplitIndex;
+    }
+
     @Override
     public String toString() {
         return "LakeSnapshotAndFlussLogSplit{"
@@ -115,6 +132,8 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
                 + lakeSnapshotSplits
                 + ", recordOffset="
                 + recordOffset
+                + ", currentLakeSplitIndex="
+                + currentLakeSplitIndex
                 + ", startingOffset="
                 + startingOffset
                 + ", stoppingOffset="
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
index c7ddf59d1..d46b6575f 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotAndFlussLogSplitState.java
@@ -27,19 +27,33 @@ public class LakeSnapshotAndFlussLogSplitState extends 
SourceSplitState {
 
     private long recordsToSkip;
     private final LakeSnapshotAndFlussLogSplit split;
+    private int currentLakeSplitIndex;
+    private long nextLogOffset;
 
     public LakeSnapshotAndFlussLogSplitState(LakeSnapshotAndFlussLogSplit 
split) {
         super(split);
         this.recordsToSkip = split.getRecordsToSkip();
         this.split = split;
+        this.currentLakeSplitIndex = split.getCurrentLakeSplitIndex();
+        this.nextLogOffset = split.getStartingOffset();
     }
 
     public void setRecordsToSkip(long recordsToSkip) {
         this.recordsToSkip = recordsToSkip;
     }
 
+    public void setCurrentLakeSplitIndex(int currentLakeSplitIndex) {
+        this.currentLakeSplitIndex = currentLakeSplitIndex;
+    }
+
+    public void setNextLogOffset(long nextOffset) {
+        this.nextLogOffset = nextOffset;
+    }
+
     @Override
     public SourceSplitBase toSourceSplit() {
-        return split.updateWithRecordsToSkip(recordsToSkip);
+        return split.updateWithCurrentLakeSplitIndex(currentLakeSplitIndex)
+                .updateWithRecordsToSkip(recordsToSkip)
+                .updateWithStartingOffset(nextLogOffset);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
index 24a1a11bd..49f0d5ff7 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.fluss.flink.source.split.LogSplit.NO_STOPPING_OFFSET;
+
 /**
  * An implementation of {@link OffsetsInitializer} which does not initialize 
anything.
  *
@@ -37,6 +39,6 @@ public class NoStoppingOffsetsInitializer implements 
OffsetsInitializer {
             @Nullable String partitionName,
             Collection<Integer> buckets,
             OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever) {
-        return buckets.stream().collect(Collectors.toMap(x -> x, x -> 
Long.MAX_VALUE));
+        return buckets.stream().collect(Collectors.toMap(x -> x, x -> 
NO_STOPPING_OFFSET));
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java
index cd8d63405..957a307cd 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/BoundedSplitReader.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.source.reader;
 
 import org.apache.fluss.client.table.scanner.ScanRecord;
 import org.apache.fluss.client.table.scanner.batch.BatchScanner;
+import org.apache.fluss.flink.lake.reader.IndexedLakeSplitRecordIterator;
 import org.apache.fluss.flink.source.split.SnapshotSplit;
 import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.utils.CloseableIterator;
@@ -135,9 +136,14 @@ public class BoundedSplitReader implements AutoCloseable {
 
     private static class ScanRecordBatch implements 
CloseableIterator<ScanRecord> {
         private final CloseableIterator<InternalRow> rowIterator;
+        private int currentSplitIndex;
 
         public ScanRecordBatch(CloseableIterator<InternalRow> rowIterator) {
             this.rowIterator = rowIterator;
+            if (rowIterator instanceof IndexedLakeSplitRecordIterator) {
+                currentSplitIndex =
+                        ((IndexedLakeSplitRecordIterator) 
rowIterator).getCurrentLakeSplitIndex();
+            }
         }
 
         @Override
@@ -154,6 +160,10 @@ public class BoundedSplitReader implements AutoCloseable {
         public void close() {
             rowIterator.close();
         }
+
+        public int getCurrentSplitIndex() {
+            return currentSplitIndex;
+        }
     }
 
     private class RecordAndPosBatch implements CloseableIterator<RecordAndPos> 
{
@@ -163,7 +173,12 @@ public class BoundedSplitReader implements AutoCloseable {
 
         RecordAndPosBatch replace(CloseableIterator<ScanRecord> records) {
             this.records = records;
-            recordAndPosition.setRecord(null, NO_READ_RECORDS_COUNT);
+            if (records instanceof ScanRecordBatch) {
+                int currentSplitIndex = ((ScanRecordBatch) 
records).getCurrentSplitIndex();
+                recordAndPosition.setRecord(null, NO_READ_RECORDS_COUNT, 
currentSplitIndex);
+            } else {
+                recordAndPosition.setRecord(null, NO_READ_RECORDS_COUNT);
+            }
             return this;
         }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
index b809feca5..fa0cb9e1b 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java
@@ -27,6 +27,7 @@ import org.apache.fluss.client.table.scanner.log.ScanRecords;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.flink.lake.LakeSplitReaderGenerator;
+import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
 import org.apache.fluss.flink.source.metrics.FlinkSourceReaderMetrics;
 import org.apache.fluss.flink.source.split.HybridSnapshotLogSplit;
@@ -208,6 +209,17 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
                 subscribeLog(sourceSplitBase, 
sourceSplitBase.asLogSplit().getStartingOffset());
             } else if (sourceSplitBase.isLakeSplit()) {
                 getLakeSplitReader().addSplit(sourceSplitBase, boundedSplits);
+                if (sourceSplitBase instanceof LakeSnapshotAndFlussLogSplit) {
+                    LakeSnapshotAndFlussLogSplit lakeSnapshotAndFlussLogSplit =
+                            (LakeSnapshotAndFlussLogSplit) sourceSplitBase;
+                    if (lakeSnapshotAndFlussLogSplit.isStreaming()) {
+                        // is streaming split which has no stopping offset, we 
need also subscribe
+                        // change log
+                        subscribeLog(
+                                lakeSnapshotAndFlussLogSplit,
+                                
lakeSnapshotAndFlussLogSplit.getStartingOffset());
+                    }
+                }
             } else {
                 throw new UnsupportedOperationException(
                         String.format(
@@ -503,7 +515,11 @@ public class FlinkSourceSplitReader implements 
SplitReader<RecordAndPos, SourceS
     private FlinkRecordsWithSplitIds finishCurrentBoundedSplit() throws 
IOException {
         Set<String> finishedSplits =
                 currentBoundedSplit instanceof HybridSnapshotLogSplit
-                        // is hybrid split, not to finish this split
+                                || (currentBoundedSplit instanceof 
LakeSnapshotAndFlussLogSplit
+                                        && ((LakeSnapshotAndFlussLogSplit) 
currentBoundedSplit)
+                                                .isStreaming())
+                        // is hybrid split, or is lakeAndFlussLog split in 
streaming mode,
+                        // not to finish this split
                         // since it remains log to read
                         ? Collections.emptySet()
                         : Collections.singleton(currentBoundedSplit.splitId());
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/MutableRecordAndPos.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/MutableRecordAndPos.java
index a6edff4ec..1358730dc 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/MutableRecordAndPos.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/MutableRecordAndPos.java
@@ -28,11 +28,16 @@ import org.apache.fluss.client.table.scanner.ScanRecord;
 public class MutableRecordAndPos extends RecordAndPos {
 
     public MutableRecordAndPos() {
-        super(null, NO_READ_RECORDS_COUNT);
+        super(null, NO_READ_RECORDS_COUNT, DEFAULT_SPLIT_INDEX);
     }
 
     public void setRecord(ScanRecord scanRecord, long readRecordsCount) {
+        setRecord(scanRecord, readRecordsCount, DEFAULT_SPLIT_INDEX);
+    }
+
+    public void setRecord(ScanRecord scanRecord, long readRecordsCount, int 
currentIteratorIndex) {
         this.scanRecord = scanRecord;
         this.readRecordsCount = readRecordsCount;
+        this.currentSplitIndex = currentIteratorIndex;
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/RecordAndPos.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/RecordAndPos.java
index 3f48b156d..1cdf93dce 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/RecordAndPos.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/RecordAndPos.java
@@ -23,9 +23,14 @@ import 
org.apache.fluss.flink.source.emitter.FlinkRecordEmitter;
 import java.util.Objects;
 
 /**
- * A record wrapping a Fluss {@link ScanRecord} and the {@code 
readRecordsCount} when the record is
- * from reading snapshot. When the record is from reading log, {@code 
readRecordsCount} will always
- * be {@link #NO_READ_RECORDS_COUNT}.
+ * A record wrapping a Fluss {@link ScanRecord}, the {@code readRecordsCount} 
when the record is
+ * from reading snapshot, the code {@code currentSplitIndex} where the record 
is from when the Flink
+ * source split read by split reader contains multiple splits(splittable 
unit), like {@link
+ * org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit} contains 
multiple {@link
+ * org.apache.fluss.lake.source.LakeSplit}s.
+ *
+ * <p>When the record is from reading log, {@code readRecordsCount} will 
always be {@link
+ * #NO_READ_RECORDS_COUNT}.
  *
  * <p>The {@code readRecordsCount} defines the point in the snapshot reader 
AFTER the record. Record
  * processing and updating checkpointed state happens atomically. The position 
points to where the
@@ -39,25 +44,38 @@ import java.util.Objects;
 public class RecordAndPos {
 
     public static final long NO_READ_RECORDS_COUNT = -1;
+    protected static final int DEFAULT_SPLIT_INDEX = 0;
 
     protected ScanRecord scanRecord;
 
     // the read records count include this record when read this record
     protected long readRecordsCount;
 
+    // the index for the current split that the record is from
+    protected int currentSplitIndex;
+
     public RecordAndPos(ScanRecord scanRecord) {
-        this(scanRecord, NO_READ_RECORDS_COUNT);
+        this(scanRecord, NO_READ_RECORDS_COUNT, DEFAULT_SPLIT_INDEX);
     }
 
     public RecordAndPos(ScanRecord scanRecord, long readRecordsCount) {
+        this(scanRecord, readRecordsCount, DEFAULT_SPLIT_INDEX);
+    }
+
+    public RecordAndPos(ScanRecord scanRecord, long readRecordsCount, int 
currentSplitIndex) {
         this.scanRecord = scanRecord;
         this.readRecordsCount = readRecordsCount;
+        this.currentSplitIndex = currentSplitIndex;
     }
 
     public long readRecordsCount() {
         return readRecordsCount;
     }
 
+    public int getCurrentSplitIndex() {
+        return currentSplitIndex;
+    }
+
     public ScanRecord record() {
         return scanRecord;
     }
@@ -72,12 +90,13 @@ public class RecordAndPos {
         }
         RecordAndPos that = (RecordAndPos) o;
         return readRecordsCount == that.readRecordsCount
+                && currentSplitIndex == that.currentSplitIndex
                 && Objects.equals(scanRecord, that.scanRecord);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(scanRecord, readRecordsCount);
+        return Objects.hash(scanRecord, readRecordsCount, currentSplitIndex);
     }
 
     @Override
@@ -87,6 +106,8 @@ public class RecordAndPos {
                 + scanRecord
                 + ", readRecordsCount="
                 + readRecordsCount
+                + ", currentSplitIndex="
+                + currentSplitIndex
                 + '}';
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index fa52a86cd..d508fb49b 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -25,6 +25,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
+import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
 import org.apache.fluss.flink.tiering.source.TableBucketWriteResult;
 import org.apache.fluss.flink.tiering.source.TieringSource;
 import org.apache.fluss.lake.committer.BucketOffset;
@@ -43,6 +44,7 @@ import org.apache.fluss.utils.json.BucketOffsetJsonSerde;
 
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
@@ -124,6 +126,17 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         admin = connection.getAdmin();
     }
 
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+        int attemptNumber = getRuntimeContext().getAttemptNumber();
+        if (attemptNumber > 0) {
+            // attempt number is greater than zero, the job must failover
+            operatorEventGateway.sendEventToCoordinator(
+                    new SourceEventWrapper(new TieringRestoreEvent()));
+        }
+    }
+
     @Override
     public void 
processElement(StreamRecord<TableBucketWriteResult<WriteResult>> streamRecord)
             throws Exception {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
new file mode 100644
index 000000000..77a48f9e7
--- /dev/null
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/TieringRestoreEvent.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.tiering.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+/** SourceEvent used to represent tiering is restoring. */
+public class TieringRestoreEvent implements SourceEvent {
+    private static final long serialVersionUID = 1L;
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 7d6a3b31a..b0b58a092 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -26,6 +26,7 @@ import org.apache.fluss.config.Configuration;
 import org.apache.fluss.flink.metrics.FlinkMetricRegistry;
 import org.apache.fluss.flink.tiering.event.FailedTieringEvent;
 import org.apache.fluss.flink.tiering.event.FinishedTieringEvent;
+import org.apache.fluss.flink.tiering.event.TieringRestoreEvent;
 import org.apache.fluss.flink.tiering.source.split.TieringSplit;
 import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
 import 
org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
@@ -217,6 +218,15 @@ public class TieringSourceEnumerator
             }
         }
 
+        if (sourceEvent instanceof TieringRestoreEvent) {
+            LOG.info(
+                    "Receiving tiering restore event, mark current tiering 
table epoch {} as failed.",
+                    tieringTableEpochs);
+            // we need to make all as failed
+            failedTableEpochs.putAll(new HashMap<>(tieringTableEpochs));
+            tieringTableEpochs.clear();
+        }
+
         if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
             // call one round of heartbeat to notify table has been finished 
or failed
             this.context.callAsync(
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
index 0f1796a78..43bcf5fce 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/LakeSourceUtils.java
@@ -56,8 +56,15 @@ public class LakeSourceUtils {
                 Configuration.fromMap(properties)
                         .get(ConfigOptions.TABLE_DATALAKE_FORMAT)
                         .toString();
-        LakeStoragePlugin lakeStoragePlugin =
-                LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
+        LakeStoragePlugin lakeStoragePlugin;
+        try {
+            lakeStoragePlugin = 
LakeStoragePluginSetUp.fromDataLakeFormat(dataLake, null);
+        } catch (UnsupportedOperationException e) {
+            LOG.info(
+                    "No LakeStoragePlugin can be found for datalake format: 
{}, return null to disable reading from lake source.",
+                    dataLake);
+            return null;
+        }
         LakeStorage lakeStorage = 
checkNotNull(lakeStoragePlugin).createLakeStorage(lakeConfig);
         try {
             return (LakeSource<LakeSplit>) 
lakeStorage.createLakeSource(tablePath);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index 15bb5d7d0..bd6081417 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -54,7 +54,7 @@ class LakeSplitSerializerTest {
     private final SimpleVersionedSerializer<LakeSplit> sourceSplitSerializer =
             new TestSimpleVersionedSerializer();
 
-    private TableBucket tableBucket = new TableBucket(0, 1L, 0);
+    private final TableBucket tableBucket = new TableBucket(0, 1L, 0);
 
     private final LakeSplitSerializer serializer = new 
LakeSplitSerializer(sourceSplitSerializer);
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/RecordAndPosTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/RecordAndPosTest.java
index 76777fa0c..3a975b3e4 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/RecordAndPosTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/reader/RecordAndPosTest.java
@@ -49,8 +49,10 @@ class RecordAndPosTest {
         assertThat(recordAndPos2).isNotEqualTo(recordAndPos);
 
         assertThat(recordAndPos.toString())
-                .isEqualTo("RecordAndPos{scanRecord=+A(1,null,3)@0, 
readRecordsCount=-1}");
+                .isEqualTo(
+                        "RecordAndPos{scanRecord=+A(1,null,3)@0, 
readRecordsCount=-1, currentSplitIndex=0}");
         assertThat(recordAndPos2.toString())
-                .isEqualTo("RecordAndPos{scanRecord=+A(1,null,3)@0, 
readRecordsCount=3}");
+                .isEqualTo(
+                        "RecordAndPos{scanRecord=+A(1,null,3)@0, 
readRecordsCount=3, currentSplitIndex=0}");
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
index 87c7e14c1..5946d906c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java
@@ -54,6 +54,17 @@ public class FlinkRowAssertionsUtils {
                 .containsExactlyInAnyOrderElementsOf(expected);
     }
 
+    public static void assertResultsExactOrder(
+            CloseableIterator<Row> iterator, List<Row> expected, boolean 
closeIterator) {
+        List<String> actual = collectRowsWithTimeout(iterator, 
expected.size(), closeIterator);
+        assertThat(actual)
+                .as(
+                        "Expected %d records but got %d after waiting. Actual 
results: %s",
+                        expected.size(), actual.size(), actual)
+                .containsExactlyElementsOf(
+                        
expected.stream().map(Row::toString).collect(Collectors.toList()));
+    }
+
     public static void assertQueryResultExactOrder(
             TableEnvironment env, String query, List<String> expected) throws 
Exception {
         try (CloseableIterator<Row> rowIter = env.executeSql(query).collect()) 
{
@@ -100,7 +111,7 @@ public class FlinkRowAssertionsUtils {
             for (int i = 0; i < expectedCount; i++) {
                 // Wait for next record with timeout
                 if (!waitForNextWithTimeout(
-                        iterator, deadlineTimeMs - 
System.currentTimeMillis())) {
+                        iterator, Math.max(deadlineTimeMs - 
System.currentTimeMillis(), 1_000))) {
                     throw timeoutError(
                             System.currentTimeMillis() - startTimeMs, 
expectedCount, actual.size());
                 }
@@ -153,9 +164,11 @@ public class FlinkRowAssertionsUtils {
     private static boolean waitForNextWithTimeout(
             CloseableIterator<Row> iterator, long maxWaitTime) {
         CompletableFuture<Boolean> future = 
CompletableFuture.supplyAsync(iterator::hasNext);
+        System.out.println("Waiting for " + maxWaitTime + " ms to finish.");
         try {
             return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
         } catch (TimeoutException e) {
+            System.err.println("Timeout waiting for " + maxWaitTime + " ms to 
finish.");
             future.cancel(true);
             return false;
         } catch (Exception e) {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 038d32a12..4bf3e97cd 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -33,6 +33,8 @@ import org.apache.fluss.types.DataTypes;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -50,6 +52,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -119,9 +123,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
 
         List<Row> expectedRows = new ArrayList<>();
         if (isPartitioned) {
-            List<String> partitions = Arrays.asList("2025", "2026");
-
-            for (String partition : partitions) {
+            for (String partition : waitUntilPartitions(t1).values()) {
                 expectedRows.add(
                         Row.of(
                                 false,
@@ -266,9 +268,7 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
 
         expectedRows = new ArrayList<>();
         if (isPartitioned) {
-            List<String> partitions = Arrays.asList("2025", "2026");
-
-            for (String partition : partitions) {
+            for (String partition : waitUntilPartitions(t1).values()) {
                 expectedRows.add(
                         Row.of(
                                 false,
@@ -373,6 +373,225 @@ class FlinkUnionReadPrimaryKeyTableITCase extends 
FlinkUnionReadTestBase {
         
assertThat(projectRows2.toString()).isEqualTo(sortedRows(expectedProjectRows2).toString());
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName =
+                "stream_pk_table_full" + (isPartitioned ? "_partitioned" : 
"_non_partitioned");
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        // create table & write initial data
+        long tableId =
+                preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+
+        // wait unit records have been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // check the status of replica after synced
+        assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, 
bucketLogEndOffset);
+
+        // will read paimon snapshot, should only +I since no change log
+        List<Row> expectedRows = new ArrayList<>();
+        if (isPartitioned) {
+            for (String partition : waitUntilPartitions(t1).values()) {
+                expectedRows.add(
+                        Row.of(
+                                false,
+                                (byte) 1,
+                                (short) 2,
+                                3,
+                                4L,
+                                5.1f,
+                                6.0d,
+                                "string",
+                                Decimal.fromUnscaledLong(9, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(10), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273182L),
+                                TimestampLtz.fromEpochMillis(1698235273182L, 
5000),
+                                TimestampNtz.fromMillis(1698235273183L),
+                                TimestampNtz.fromMillis(1698235273183L, 6000),
+                                new byte[] {1, 2, 3, 4},
+                                partition));
+                expectedRows.add(
+                        Row.of(
+                                true,
+                                (byte) 10,
+                                (short) 20,
+                                30,
+                                40L,
+                                50.1f,
+                                60.0d,
+                                "another_string",
+                                Decimal.fromUnscaledLong(90, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(100), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273200L),
+                                TimestampLtz.fromEpochMillis(1698235273200L, 
5000),
+                                TimestampNtz.fromMillis(1698235273201L),
+                                TimestampNtz.fromMillis(1698235273201L, 6000),
+                                new byte[] {1, 2, 3, 4},
+                                partition));
+            }
+        } else {
+            expectedRows =
+                    Arrays.asList(
+                            Row.of(
+                                    false,
+                                    (byte) 1,
+                                    (short) 2,
+                                    3,
+                                    4L,
+                                    5.1f,
+                                    6.0d,
+                                    "string",
+                                    Decimal.fromUnscaledLong(9, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(10), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273182L),
+                                    
TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+                                    TimestampNtz.fromMillis(1698235273183L),
+                                    TimestampNtz.fromMillis(1698235273183L, 
6000),
+                                    new byte[] {1, 2, 3, 4},
+                                    null),
+                            Row.of(
+                                    true,
+                                    (byte) 10,
+                                    (short) 20,
+                                    30,
+                                    40L,
+                                    50.1f,
+                                    60.0d,
+                                    "another_string",
+                                    Decimal.fromUnscaledLong(90, 5, 2),
+                                    Decimal.fromBigDecimal(new 
java.math.BigDecimal(100), 20, 0),
+                                    
TimestampLtz.fromEpochMillis(1698235273200L),
+                                    
TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+                                    TimestampNtz.fromMillis(1698235273201L),
+                                    TimestampNtz.fromMillis(1698235273201L, 
6000),
+                                    new byte[] {1, 2, 3, 4},
+                                    null));
+        }
+
+        String query = "select * from " + tableName;
+        CloseableIterator<Row> actual = streamTEnv.executeSql(query).collect();
+        assertRowResultsIgnoreOrder(actual, expectedRows, false);
+
+        // stop lake tiering service
+        jobClient.cancel().get();
+
+        // write a row again
+        if (isPartitioned) {
+            Map<Long, String> partitionNameById = waitUntilPartitions(t1);
+            for (String partition : partitionNameById.values()) {
+                writeFullTypeRow(t1, partition);
+            }
+        } else {
+            writeFullTypeRow(t1, null);
+        }
+
+        // should generate -U & +U
+        List<Row> expectedRows2 = new ArrayList<>();
+        if (isPartitioned) {
+            for (String partition : waitUntilPartitions(t1).values()) {
+                expectedRows2.add(
+                        Row.ofKind(
+                                RowKind.UPDATE_BEFORE,
+                                true,
+                                (byte) 10,
+                                (short) 20,
+                                30,
+                                40L,
+                                50.1f,
+                                60.0d,
+                                "another_string",
+                                Decimal.fromUnscaledLong(90, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(100), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273200L),
+                                TimestampLtz.fromEpochMillis(1698235273200L, 
5000),
+                                TimestampNtz.fromMillis(1698235273201L),
+                                TimestampNtz.fromMillis(1698235273201L, 6000),
+                                new byte[] {1, 2, 3, 4},
+                                partition));
+                expectedRows2.add(
+                        Row.ofKind(
+                                RowKind.UPDATE_AFTER,
+                                true,
+                                (byte) 100,
+                                (short) 200,
+                                30,
+                                400L,
+                                500.1f,
+                                600.0d,
+                                "another_string_2",
+                                Decimal.fromUnscaledLong(900, 5, 2),
+                                Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                                TimestampLtz.fromEpochMillis(1698235273400L),
+                                TimestampLtz.fromEpochMillis(1698235273400L, 
7000),
+                                TimestampNtz.fromMillis(1698235273501L),
+                                TimestampNtz.fromMillis(1698235273501L, 8000),
+                                new byte[] {5, 6, 7, 8},
+                                partition));
+            }
+        } else {
+            expectedRows2.add(
+                    Row.ofKind(
+                            RowKind.UPDATE_BEFORE,
+                            true,
+                            (byte) 10,
+                            (short) 20,
+                            30,
+                            40L,
+                            50.1f,
+                            60.0d,
+                            "another_string",
+                            Decimal.fromUnscaledLong(90, 5, 2),
+                            Decimal.fromBigDecimal(new 
java.math.BigDecimal(100), 20, 0),
+                            TimestampLtz.fromEpochMillis(1698235273200L),
+                            TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+                            TimestampNtz.fromMillis(1698235273201L),
+                            TimestampNtz.fromMillis(1698235273201L, 6000),
+                            new byte[] {1, 2, 3, 4},
+                            null));
+            expectedRows2.add(
+                    Row.ofKind(
+                            RowKind.UPDATE_AFTER,
+                            true,
+                            (byte) 100,
+                            (short) 200,
+                            30,
+                            400L,
+                            500.1f,
+                            600.0d,
+                            "another_string_2",
+                            Decimal.fromUnscaledLong(900, 5, 2),
+                            Decimal.fromBigDecimal(new 
java.math.BigDecimal(1000), 20, 0),
+                            TimestampLtz.fromEpochMillis(1698235273400L),
+                            TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+                            TimestampNtz.fromMillis(1698235273501L),
+                            TimestampNtz.fromMillis(1698235273501L, 8000),
+                            new byte[] {5, 6, 7, 8},
+                            null));
+        }
+
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, expectedRows2, true);
+        } else {
+            assertResultsExactOrder(actual, expectedRows2, true);
+        }
+
+        // query again
+        actual = streamTEnv.executeSql(query).collect();
+        List<Row> totalExpectedRows = new ArrayList<>(expectedRows);
+        totalExpectedRows.addAll(expectedRows2);
+
+        if (isPartitioned) {
+            assertRowResultsIgnoreOrder(actual, totalExpectedRows, true);
+        } else {
+            assertResultsExactOrder(actual, totalExpectedRows, true);
+        }
+    }
+
     private List<Row> sortedRows(List<Row> rows) {
         rows.sort(Comparator.comparing(Row::toString));
         return rows;
diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml
index aaa08c07d..f2baea535 100644
--- a/fluss-test-coverage/pom.xml
+++ b/fluss-test-coverage/pom.xml
@@ -392,6 +392,7 @@
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSourceOptions</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource.Builder</exclude>
                                         
<exclude>org.apache.fluss.flink.tiering.source.TieringSource</exclude>
+                                        
<exclude>org.apache.fluss.flink.tiering.event.TieringRestoreEvent</exclude>
                                         <exclude>
                                             
org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator
                                         </exclude>

Reply via email to