This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 79379968d [flink] Union read support log table in streaming mode  
(#1575)
79379968d is described below

commit 79379968d0f8b36ac48fb0636a3fd65bb06ea84d
Author: CaoZhen <[email protected]>
AuthorDate: Wed Aug 27 09:55:43 2025 +0800

    [flink] Union read support log table in streaming mode  (#1575)
---
 .../java/org/apache/fluss/client/admin/Admin.java  |   2 +
 .../fluss/flink/lake/LakeSplitGenerator.java       |  49 ++++---
 .../fluss/flink/lake/LakeSplitSerializer.java      |  10 +-
 .../lake/split/LakeSnapshotAndFlussLogSplit.java   |  19 +++
 .../fluss/flink/lake/split/LakeSnapshotSplit.java  |  44 ++++++-
 .../flink/lake/state/LakeSnapshotSplitState.java   |   1 +
 .../org/apache/fluss/flink/source/FlinkSource.java |   3 +-
 .../source/enumerator/FlinkSourceEnumerator.java   | 141 ++++++++++++++++-----
 .../initializer/NoStoppingOffsetsInitializer.java  |   4 +-
 .../FlussSourceEnumeratorStateSerializer.java      |  71 ++++++++++-
 .../flink/source/state/SourceEnumeratorState.java  |  18 ++-
 .../source/enumerator/TieringSourceEnumerator.java |   6 +
 .../fluss/flink/lake/LakeSplitSerializerTest.java  |   2 +-
 .../enumerator/FlinkSourceEnumeratorTest.java      |   1 +
 .../state/SourceEnumeratorStateSerializerTest.java |   6 +-
 fluss-lake/fluss-lake-paimon/pom.xml               |   1 +
 .../paimon/flink/FlinkUnionReadLogTableITCase.java |  50 +++++++-
 .../lake/paimon/flink/FlinkUnionReadTestBase.java  |  19 +++
 18 files changed, 371 insertions(+), 76 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index 6ce225d25..baaab3507 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -31,6 +31,7 @@ import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.exception.InvalidReplicationFactorException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.KvSnapshotNotExistException;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
 import org.apache.fluss.exception.NonPrimaryKeyTableException;
 import org.apache.fluss.exception.PartitionAlreadyExistsException;
 import org.apache.fluss.exception.PartitionNotExistException;
@@ -383,6 +384,7 @@ public interface Admin extends AutoCloseable {
      *
      * <ul>
      *   <li>{@link TableNotExistException} if the table does not exist.
+     *   <li>{@link LakeTableSnapshotNotExistException} if no any lake 
snapshot exist.
      * </ul>
      *
      * @param tablePath the table path of the table.
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
index 1854fd26f..e420de630 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitGenerator.java
@@ -19,6 +19,7 @@ package org.apache.fluss.flink.lake;
 
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.metadata.LakeSnapshot;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
 import org.apache.fluss.flink.lake.split.LakeSnapshotAndFlussLogSplit;
 import org.apache.fluss.flink.lake.split.LakeSnapshotSplit;
 import org.apache.fluss.flink.source.enumerator.initializer.OffsetsInitializer;
@@ -29,6 +30,7 @@ import org.apache.fluss.lake.source.LakeSplit;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
+import org.apache.fluss.utils.ExceptionUtils;
 
 import javax.annotation.Nullable;
 
@@ -75,10 +77,23 @@ public class LakeSplitGenerator {
         this.listPartitionSupplier = listPartitionSupplier;
     }
 
-    public List<SourceSplitBase> generateHybridLakeSplits() throws Exception {
-        // get the file store
-        LakeSnapshot lakeSnapshotInfo =
-                
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+    /**
+     * Return A list of hybrid lake snapshot {@link LakeSnapshotSplit}, {@link
+     * LakeSnapshotAndFlussLogSplit} and the corresponding Fluss {@link 
LogSplit} based on the lake
+     * snapshot. Return null if no lake snapshot exists.
+     */
+    @Nullable
+    public List<SourceSplitBase> generateHybridLakeFlussSplits() throws 
Exception {
+        LakeSnapshot lakeSnapshotInfo;
+        try {
+            lakeSnapshotInfo = 
flussAdmin.getLatestLakeSnapshot(tableInfo.getTablePath()).get();
+        } catch (Exception exception) {
+            if (ExceptionUtils.stripExecutionException(exception)
+                    instanceof LakeTableSnapshotNotExistException) {
+                return null;
+            }
+            throw exception;
+        }
 
         boolean isLogTable = !tableInfo.hasPrimaryKey();
         boolean isPartitioned = tableInfo.isPartitioned();
@@ -90,10 +105,7 @@ public class LakeSplitGenerator {
                                         (LakeSource.PlannerContext) 
lakeSnapshotInfo::getSnapshotId)
                                 .plan());
 
-        if (lakeSplits.isEmpty()) {
-            return Collections.emptyList();
-        }
-
+        Map<TableBucket, Long> tableBucketsOffset = 
lakeSnapshotInfo.getTableBucketsOffset();
         if (isPartitioned) {
             Set<PartitionInfo> partitionInfos = listPartitionSupplier.get();
             Map<Long, String> partitionNameById =
@@ -103,16 +115,13 @@ public class LakeSplitGenerator {
                                             PartitionInfo::getPartitionId,
                                             PartitionInfo::getPartitionName));
             return generatePartitionTableSplit(
-                    lakeSplits,
-                    isLogTable,
-                    lakeSnapshotInfo.getTableBucketsOffset(),
-                    partitionNameById);
+                    lakeSplits, isLogTable, tableBucketsOffset, 
partitionNameById);
         } else {
             Map<Integer, List<LakeSplit>> nonPartitionLakeSplits =
                     lakeSplits.values().iterator().next();
             // non-partitioned table
             return generateNoPartitionedTableSplit(
-                    nonPartitionLakeSplits, isLogTable, 
lakeSnapshotInfo.getTableBucketsOffset());
+                    nonPartitionLakeSplits, isLogTable, tableBucketsOffset);
         }
     }
 
@@ -134,8 +143,7 @@ public class LakeSplitGenerator {
             Map<String, Map<Integer, List<LakeSplit>>> lakeSplits,
             boolean isLogTable,
             Map<TableBucket, Long> tableBucketSnapshotLogOffset,
-            Map<Long, String> partitionNameById)
-            throws Exception {
+            Map<Long, String> partitionNameById) {
         List<SourceSplitBase> splits = new ArrayList<>();
         Map<String, Long> flussPartitionIdByName =
                 partitionNameById.entrySet().stream()
@@ -189,7 +197,7 @@ public class LakeSplitGenerator {
         // iterate remain fluss splits
         for (Map.Entry<String, Long> partitionIdByNameEntry : 
flussPartitionIdByName.entrySet()) {
             String partitionName = partitionIdByNameEntry.getKey();
-            long partitionId = partitionIdByNameEntry.getValue();
+            Long partitionId = partitionIdByNameEntry.getValue();
             Map<Integer, Long> bucketEndOffset =
                     stoppingOffsetInitializer.getBucketOffsets(
                             partitionName,
@@ -224,7 +232,7 @@ public class LakeSplitGenerator {
                 TableBucket tableBucket =
                         new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
                 Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
-                long stoppingOffset = bucketEndOffset.get(bucket);
+                Long stoppingOffset = bucketEndOffset.get(bucket);
                 if (snapshotLogOffset == null) {
                     // no any data commit to this bucket, scan from fluss log
                     splits.add(
@@ -248,7 +256,7 @@ public class LakeSplitGenerator {
                 TableBucket tableBucket =
                         new TableBucket(tableInfo.getTableId(), partitionId, 
bucket);
                 Long snapshotLogOffset = 
tableBucketSnapshotLogOffset.get(tableBucket);
-                long stoppingOffset = bucketEndOffset.get(bucket);
+                Long stoppingOffset = bucketEndOffset.get(bucket);
                 splits.add(
                         generateSplitForPrimaryKeyTableBucket(
                                 lakeSplits != null ? lakeSplits.get(bucket) : 
null,
@@ -267,11 +275,14 @@ public class LakeSplitGenerator {
             @Nullable String partitionName,
             @Nullable Long partitionId) {
         List<SourceSplitBase> splits = new ArrayList<>();
+        // we may have multiple table buckets; so we need to
+        // introduce an index to make split unique
+        int index = 0;
         for (LakeSplit lakeSplit :
                 
lakeSplits.values().stream().flatMap(List::stream).collect(Collectors.toList()))
 {
             TableBucket tableBucket =
                     new TableBucket(tableInfo.getTableId(), partitionId, 
lakeSplit.bucket());
-            splits.add(new LakeSnapshotSplit(tableBucket, partitionName, 
lakeSplit));
+            splits.add(new LakeSnapshotSplit(tableBucket, partitionName, 
lakeSplit, index++));
         }
         return splits;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
index 88c4e6db8..d4d200ff9 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeSplitSerializer.java
@@ -48,8 +48,9 @@ public class LakeSplitSerializer {
 
     public void serialize(DataOutputSerializer out, SourceSplitBase split) 
throws IOException {
         if (split instanceof LakeSnapshotSplit) {
-            byte[] serializeBytes =
-                    sourceSplitSerializer.serialize(((LakeSnapshotSplit) 
split).getLakeSplit());
+            LakeSnapshotSplit lakeSplit = (LakeSnapshotSplit) split;
+            out.writeInt(lakeSplit.getSplitIndex());
+            byte[] serializeBytes = 
sourceSplitSerializer.serialize(lakeSplit.getLakeSplit());
             out.writeInt(serializeBytes.length);
             out.write(serializeBytes);
         } else if (split instanceof LakeSnapshotAndFlussLogSplit) {
@@ -89,12 +90,13 @@ public class LakeSplitSerializer {
             DataInputDeserializer input)
             throws IOException {
         if (splitKind == LAKE_SNAPSHOT_SPLIT_KIND) {
+            int splitIndex = input.readInt();
             byte[] serializeBytes = new byte[input.readInt()];
             input.read(serializeBytes);
-            LakeSplit fileStoreSourceSplit =
+            LakeSplit lakeSplit =
                     sourceSplitSerializer.deserialize(
                             sourceSplitSerializer.getVersion(), 
serializeBytes);
-            return new LakeSnapshotSplit(tableBucket, partition, 
fileStoreSourceSplit);
+            return new LakeSnapshotSplit(tableBucket, partition, lakeSplit, 
splitIndex);
         } else if (splitKind == LAKE_SNAPSHOT_FLUSS_LOG_SPLIT_KIND) {
             List<LakeSplit> lakeSplits = null;
             if (input.readBoolean()) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
index c95bb51d4..0c2d60dbb 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotAndFlussLogSplit.java
@@ -107,4 +107,23 @@ public class LakeSnapshotAndFlussLogSplit extends 
SourceSplitBase {
     public List<LakeSplit> getLakeSplits() {
         return lakeSnapshotSplits;
     }
+
+    @Override
+    public String toString() {
+        return "LakeSnapshotAndFlussLogSplit{"
+                + "lakeSnapshotSplits="
+                + lakeSnapshotSplits
+                + ", recordOffset="
+                + recordOffset
+                + ", startingOffset="
+                + startingOffset
+                + ", stoppingOffset="
+                + stoppingOffset
+                + ", tableBucket="
+                + tableBucket
+                + ", partitionName='"
+                + partitionName
+                + '\''
+                + '}';
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
index 1cc883d3f..b85d990fe 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/split/LakeSnapshotSplit.java
@@ -32,18 +32,25 @@ public class LakeSnapshotSplit extends SourceSplitBase {
 
     private final long recordsToSplit;
 
+    private final int splitIndex;
+
     public LakeSnapshotSplit(
-            TableBucket tableBucket, @Nullable String partitionName, LakeSplit 
lakeSplit) {
-        this(tableBucket, partitionName, lakeSplit, 0);
+            TableBucket tableBucket,
+            @Nullable String partitionName,
+            LakeSplit lakeSplit,
+            int splitIndex) {
+        this(tableBucket, partitionName, lakeSplit, splitIndex, 0);
     }
 
     public LakeSnapshotSplit(
             TableBucket tableBucket,
             @Nullable String partitionName,
             LakeSplit lakeSplit,
+            int splitIndex,
             long recordsToSplit) {
         super(tableBucket, partitionName);
         this.lakeSplit = lakeSplit;
+        this.splitIndex = splitIndex;
         this.recordsToSplit = recordsToSplit;
     }
 
@@ -55,14 +62,20 @@ public class LakeSnapshotSplit extends SourceSplitBase {
         return recordsToSplit;
     }
 
+    public int getSplitIndex() {
+        return splitIndex;
+    }
+
     @Override
     public String splitId() {
         return toSplitId(
-                "lake-snapshot-",
-                new TableBucket(
-                        tableBucket.getTableId(),
-                        tableBucket.getPartitionId(),
-                        lakeSplit.bucket()));
+                        "lake-snapshot-",
+                        new TableBucket(
+                                tableBucket.getTableId(),
+                                tableBucket.getPartitionId(),
+                                lakeSplit.bucket()))
+                + "-"
+                + splitIndex;
     }
 
     @Override
@@ -74,4 +87,21 @@ public class LakeSnapshotSplit extends SourceSplitBase {
     public byte splitKind() {
         return LAKE_SNAPSHOT_SPLIT_KIND;
     }
+
+    @Override
+    public String toString() {
+        return "LakeSnapshotSplit{"
+                + "lakeSplit="
+                + lakeSplit
+                + ", recordsToSplit="
+                + recordsToSplit
+                + ", splitIndex="
+                + splitIndex
+                + ", tableBucket="
+                + tableBucket
+                + ", partitionName='"
+                + partitionName
+                + '\''
+                + '}';
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
index e81358eae..399601b99 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/state/LakeSnapshotSplitState.java
@@ -43,6 +43,7 @@ public class LakeSnapshotSplitState extends SourceSplitState {
                 split.getTableBucket(),
                 split.getPartitionName(),
                 split.getLakeSplit(),
+                split.getSplitIndex(),
                 recordsToSplit);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
index 375d0540f..9687b5efc 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java
@@ -161,6 +161,7 @@ public class FlinkSource<OUT>
                 splitEnumeratorContext,
                 sourceEnumeratorState.getAssignedBuckets(),
                 sourceEnumeratorState.getAssignedPartitions(),
+                sourceEnumeratorState.getRemainingHybridLakeFlussSplits(),
                 offsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 streaming,
@@ -175,7 +176,7 @@ public class FlinkSource<OUT>
 
     @Override
     public SimpleVersionedSerializer<SourceEnumeratorState> 
getEnumeratorCheckpointSerializer() {
-        return FlussSourceEnumeratorStateSerializer.INSTANCE;
+        return new FlussSourceEnumeratorStateSerializer(lakeSource);
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
index e63d286d0..867ff442f 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java
@@ -111,6 +111,8 @@ public class FlinkSourceEnumerator
     /** buckets that have been assigned to readers. */
     private final Set<TableBucket> assignedTableBuckets;
 
+    @Nullable private List<SourceSplitBase> pendingHybridLakeFlussSplits;
+
     private final long scanPartitionDiscoveryIntervalMs;
 
     private final boolean streaming;
@@ -168,7 +170,7 @@ public class FlinkSourceEnumerator
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
             List<FieldEqual> partitionFilters,
-            LakeSource<LakeSplit> lakeSource) {
+            @Nullable LakeSource<LakeSplit> lakeSource) {
         this(
                 tablePath,
                 flussConf,
@@ -177,6 +179,7 @@ public class FlinkSourceEnumerator
                 context,
                 Collections.emptySet(),
                 Collections.emptyMap(),
+                null,
                 startingOffsetsInitializer,
                 scanPartitionDiscoveryIntervalMs,
                 streaming,
@@ -192,6 +195,7 @@ public class FlinkSourceEnumerator
             SplitEnumeratorContext<SourceSplitBase> context,
             Set<TableBucket> assignedTableBuckets,
             Map<Long, String> assignedPartitions,
+            List<SourceSplitBase> pendingHybridLakeFlussSplits,
             OffsetsInitializer startingOffsetsInitializer,
             long scanPartitionDiscoveryIntervalMs,
             boolean streaming,
@@ -206,6 +210,10 @@ public class FlinkSourceEnumerator
         this.assignedTableBuckets = new HashSet<>(assignedTableBuckets);
         this.startingOffsetsInitializer = startingOffsetsInitializer;
         this.assignedPartitions = new HashMap<>(assignedPartitions);
+        this.pendingHybridLakeFlussSplits =
+                pendingHybridLakeFlussSplits == null
+                        ? null
+                        : new LinkedList<>(pendingHybridLakeFlussSplits);
         this.scanPartitionDiscoveryIntervalMs = 
scanPartitionDiscoveryIntervalMs;
         this.streaming = streaming;
         this.partitionFilters = checkNotNull(partitionFilters);
@@ -230,22 +238,29 @@ public class FlinkSourceEnumerator
         }
 
         if (isPartitioned) {
-            if (streaming && scanPartitionDiscoveryIntervalMs > 0) {
-                // should do partition discovery
-                LOG.info(
-                        "Starting the FlussSourceEnumerator for table {} "
-                                + "with new partition discovery interval of {} 
ms.",
-                        tablePath,
-                        scanPartitionDiscoveryIntervalMs);
-                // discover new partitions and handle new partitions
-                context.callAsync(
-                        this::listPartitions,
-                        this::checkPartitionChanges,
-                        0,
-                        scanPartitionDiscoveryIntervalMs);
-            } else {
-                if (!streaming) {
-                    startInBatchMode();
+            if (streaming) {
+                if (lakeSource != null) {
+                    // we'll need to consider lake splits
+                    List<SourceSplitBase> hybridLakeFlussSplits = 
generateHybridLakeFlussSplits();
+                    if (hybridLakeFlussSplits != null) {
+                        // handle hybrid lake fluss splits firstly
+                        handleSplitsAdd(hybridLakeFlussSplits, null);
+                    }
+                }
+
+                if (scanPartitionDiscoveryIntervalMs > 0) {
+                    // should do partition discovery
+                    LOG.info(
+                            "Starting the FlussSourceEnumerator for table {} "
+                                    + "with new partition discovery interval 
of {} ms.",
+                            tablePath,
+                            scanPartitionDiscoveryIntervalMs);
+                    // discover new partitions and handle new partitions
+                    context.callAsync(
+                            this::listPartitions,
+                            this::checkPartitionChanges,
+                            0,
+                            scanPartitionDiscoveryIntervalMs);
                 } else {
                     // just call once
                     LOG.info(
@@ -253,21 +268,30 @@ public class FlinkSourceEnumerator
                             tablePath);
                     context.callAsync(this::listPartitions, 
this::checkPartitionChanges);
                 }
+            } else {
+                startInBatchMode();
             }
-
         } else {
-            if (!streaming) {
-                startInBatchMode();
+            if (streaming) {
+                startInStreamModeForNonPartitionedTable();
             } else {
-                // init bucket splits and assign
-                context.callAsync(this::initNonPartitionedSplits, 
this::handleSplitsAdd);
+                startInBatchMode();
             }
         }
     }
 
     private void startInBatchMode() {
         if (lakeEnabled) {
-            context.callAsync(this::getLakeSplit, this::handleSplitsAdd);
+            context.callAsync(
+                    () -> {
+                        List<SourceSplitBase> splits = 
generateHybridLakeFlussSplits();
+                        if (splits == null) {
+                            throw new UnsupportedOperationException(
+                                    "Currently, Batch mode can only be 
supported if one lake snapshot exists for the table.");
+                        }
+                        return splits;
+                    },
+                    this::handleSplitsAdd);
         } else {
             throw new UnsupportedOperationException(
                     String.format(
@@ -276,6 +300,26 @@ public class FlinkSourceEnumerator
         }
     }
 
+    private void startInStreamModeForNonPartitionedTable() {
+        if (lakeSource != null) {
+            context.callAsync(
+                    () -> {
+                        // firstly, try to generate hybrid lake splits,
+                        List<SourceSplitBase> splits = 
generateHybridLakeFlussSplits();
+                        // splits is null,
+                        // we'll fall back to normal fluss splits generation 
logic
+                        if (splits == null) {
+                            splits = this.initNonPartitionedSplits();
+                        }
+                        return splits;
+                    },
+                    this::handleSplitsAdd);
+        } else {
+            // init bucket splits and assign
+            context.callAsync(this::initNonPartitionedSplits, 
this::handleSplitsAdd);
+        }
+    }
+
     private List<SourceSplitBase> initNonPartitionedSplits() {
         if (hasPrimaryKey && startingOffsetsInitializer instanceof 
SnapshotOffsetsInitializer) {
             // get the table snapshot info
@@ -294,6 +338,9 @@ public class FlinkSourceEnumerator
     }
 
     private Set<PartitionInfo> listPartitions() {
+        if (closed) {
+            return Collections.emptySet();
+        }
         try {
             List<PartitionInfo> partitionInfos = 
flussAdmin.listPartitionInfos(tablePath).get();
             partitionInfos = applyPartitionFilter(partitionInfos);
@@ -512,17 +559,30 @@ public class FlinkSourceEnumerator
         return splits;
     }
 
-    private List<SourceSplitBase> getLakeSplit() throws Exception {
-        LakeSplitGenerator lakeSplitGenerator =
-                new LakeSplitGenerator(
-                        tableInfo,
-                        flussAdmin,
-                        lakeSource,
-                        bucketOffsetsRetriever,
-                        stoppingOffsetsInitializer,
-                        tableInfo.getNumBuckets(),
-                        this::listPartitions);
-        return lakeSplitGenerator.generateHybridLakeSplits();
+    @Nullable
+    private List<SourceSplitBase> generateHybridLakeFlussSplits() {
+        // still have pending lake fluss splits,
+        // should be restored from checkpoint, shouldn't
+        // list splits again
+        if (pendingHybridLakeFlussSplits != null) {
+            return pendingHybridLakeFlussSplits;
+        }
+        try {
+            LakeSplitGenerator lakeSplitGenerator =
+                    new LakeSplitGenerator(
+                            tableInfo,
+                            flussAdmin,
+                            lakeSource,
+                            bucketOffsetsRetriever,
+                            stoppingOffsetsInitializer,
+                            tableInfo.getNumBuckets(),
+                            this::listPartitions);
+            pendingHybridLakeFlussSplits = 
lakeSplitGenerator.generateHybridLakeFlussSplits();
+            return pendingHybridLakeFlussSplits;
+        } catch (Exception e) {
+            LOG.error("Failed to get hybrid lake fluss splits, won't take 
splits in lake.", e);
+        }
+        return null;
     }
 
     private boolean ignoreTableBucket(TableBucket tableBucket) {
@@ -619,6 +679,16 @@ public class FlinkSourceEnumerator
                             TableBucket tableBucket = split.getTableBucket();
                             assignedTableBuckets.add(tableBucket);
 
+                            if (pendingHybridLakeFlussSplits != null) {
+                                // removed from the 
pendingHybridLakeFlussSplits
+                                // since this split already be assigned
+                                pendingHybridLakeFlussSplits.removeIf(
+                                        hybridLakeFlussSplit ->
+                                                hybridLakeFlussSplit
+                                                        .splitId()
+                                                        
.equals(split.splitId()));
+                            }
+
                             if (isPartitioned) {
                                 long partitionId =
                                         checkNotNull(
@@ -755,7 +825,8 @@ public class FlinkSourceEnumerator
     @Override
     public SourceEnumeratorState snapshotState(long checkpointId) {
         final SourceEnumeratorState enumeratorState =
-                new SourceEnumeratorState(assignedTableBuckets, 
assignedPartitions);
+                new SourceEnumeratorState(
+                        assignedTableBuckets, assignedPartitions, 
pendingHybridLakeFlussSplits);
         LOG.debug("Source Checkpoint is {}", enumeratorState);
         return enumeratorState;
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
index b20fc1757..24a1a11bd 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/initializer/NoStoppingOffsetsInitializer.java
@@ -20,8 +20,8 @@ package org.apache.fluss.flink.source.enumerator.initializer;
 import javax.annotation.Nullable;
 
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * An implementation of {@link OffsetsInitializer} which does not initialize 
anything.
@@ -37,6 +37,6 @@ public class NoStoppingOffsetsInitializer implements 
OffsetsInitializer {
             @Nullable String partitionName,
             Collection<Integer> buckets,
             OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever) {
-        return Collections.emptyMap();
+        return buckets.stream().collect(Collectors.toMap(x -> x, x -> 
Long.MAX_VALUE));
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
index 559b7e35e..512441c9e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/FlussSourceEnumeratorStateSerializer.java
@@ -17,15 +17,23 @@
 
 package org.apache.fluss.flink.source.state;
 
+import org.apache.fluss.flink.source.split.SourceSplitBase;
+import org.apache.fluss.flink.source.split.SourceSplitSerializer;
+import org.apache.fluss.lake.source.LakeSource;
+import org.apache.fluss.lake.source.LakeSplit;
 import org.apache.fluss.metadata.TableBucket;
 
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -33,8 +41,7 @@ import java.util.Set;
 public class FlussSourceEnumeratorStateSerializer
         implements SimpleVersionedSerializer<SourceEnumeratorState> {
 
-    public static final FlussSourceEnumeratorStateSerializer INSTANCE =
-            new FlussSourceEnumeratorStateSerializer();
+    @Nullable private final LakeSource<LakeSplit> lakeSource;
 
     private static final int VERSION_0 = 0;
     private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
@@ -42,6 +49,10 @@ public class FlussSourceEnumeratorStateSerializer
 
     private static final int CURRENT_VERSION = VERSION_0;
 
+    public FlussSourceEnumeratorStateSerializer(LakeSource<LakeSplit> 
lakeSource) {
+        this.lakeSource = lakeSource;
+    }
+
     @Override
     public int getVersion() {
         return CURRENT_VERSION;
@@ -73,6 +84,10 @@ public class FlussSourceEnumeratorStateSerializer
             out.writeUTF(entry.getValue());
         }
 
+        if (lakeSource != null) {
+            serializeRemainingHybridLakeFlussSplits(out, state);
+        }
+
         final byte[] result = out.getCopyOfBuffer();
         out.clear();
         return result;
@@ -107,6 +122,56 @@ public class FlussSourceEnumeratorStateSerializer
             String partition = in.readUTF();
             assignedPartitions.put(partitionId, partition);
         }
-        return new SourceEnumeratorState(assignedBuckets, assignedPartitions);
+
+        List<SourceSplitBase> remainingHybridLakeFlussSplits = null;
+        if (lakeSource != null) {
+            // todo: add a ut for serialize remaining hybrid lake fluss splits
+            remainingHybridLakeFlussSplits = 
deserializeRemainingHybridLakeFlussSplits(in);
+        }
+
+        return new SourceEnumeratorState(
+                assignedBuckets, assignedPartitions, 
remainingHybridLakeFlussSplits);
+    }
+
+    private void serializeRemainingHybridLakeFlussSplits(
+            final DataOutputSerializer out, SourceEnumeratorState state) 
throws IOException {
+        List<SourceSplitBase> remainingHybridLakeFlussSplits =
+                state.getRemainingHybridLakeFlussSplits();
+        if (remainingHybridLakeFlussSplits != null) {
+            // write that hybrid lake fluss splits is not null
+            out.writeBoolean(true);
+            out.writeInt(remainingHybridLakeFlussSplits.size());
+            SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
+            for (SourceSplitBase split : remainingHybridLakeFlussSplits) {
+                byte[] serializeBytes = sourceSplitSerializer.serialize(split);
+                out.writeInt(serializeBytes.length);
+                out.write(serializeBytes);
+            }
+
+        } else {
+            // write that hybrid lake fluss splits is null
+            out.writeBoolean(false);
+        }
+    }
+
+    @Nullable
+    private List<SourceSplitBase> deserializeRemainingHybridLakeFlussSplits(
+            final DataInputDeserializer in) throws IOException {
+        if (in.readBoolean()) {
+            int numSplits = in.readInt();
+            List<SourceSplitBase> splits = new ArrayList<>(numSplits);
+            SourceSplitSerializer sourceSplitSerializer = new 
SourceSplitSerializer(lakeSource);
+            for (int i = 0; i < numSplits; i++) {
+                int splitSizeInBytes = in.readInt();
+                byte[] splitBytes = new byte[splitSizeInBytes];
+                in.readFully(splitBytes);
+                splits.add(
+                        sourceSplitSerializer.deserialize(
+                                sourceSplitSerializer.getVersion(), 
splitBytes));
+            }
+            return splits;
+        } else {
+            return null;
+        }
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
index ab5598963..d804d069b 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/state/SourceEnumeratorState.java
@@ -17,8 +17,12 @@
 
 package org.apache.fluss.flink.source.state;
 
+import org.apache.fluss.flink.source.split.SourceSplitBase;
 import org.apache.fluss.metadata.TableBucket;
 
+import javax.annotation.Nullable;
+
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -33,10 +37,17 @@ public class SourceEnumeratorState {
     // mapping from partition id to partition name
     private final Map<Long, String> assignedPartitions;
 
+    // the unassigned lake splits of a lake snapshot and the fluss splits base 
on the
+    // lake snapshot
+    @Nullable private final List<SourceSplitBase> 
remainingHybridLakeFlussSplits;
+
     public SourceEnumeratorState(
-            Set<TableBucket> assignedBuckets, Map<Long, String> 
assignedPartitions) {
+            Set<TableBucket> assignedBuckets,
+            Map<Long, String> assignedPartitions,
+            @Nullable List<SourceSplitBase> remainingHybridLakeFlussSplits) {
         this.assignedBuckets = assignedBuckets;
         this.assignedPartitions = assignedPartitions;
+        this.remainingHybridLakeFlussSplits = remainingHybridLakeFlussSplits;
     }
 
     public Set<TableBucket> getAssignedBuckets() {
@@ -47,6 +58,11 @@ public class SourceEnumeratorState {
         return assignedPartitions;
     }
 
+    @Nullable
+    public List<SourceSplitBase> getRemainingHybridLakeFlussSplits() {
+        return remainingHybridLakeFlussSplits;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 42ea11922..7d6a3b31a 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -104,6 +104,8 @@ public class TieringSourceEnumerator
     private TieringSplitGenerator splitGenerator;
     private int flussCoordinatorEpoch;
 
+    private volatile boolean closed = false;
+
     public TieringSourceEnumerator(
             Configuration flussConf,
             SplitEnumeratorContext<TieringSplit> context,
@@ -254,6 +256,9 @@ public class TieringSourceEnumerator
     }
 
     private @Nullable Tuple3<Long, Long, TablePath> 
requestTieringTableSplitsViaHeartBeat() {
+        if (closed) {
+            return null;
+        }
         Map<Long, Long> currentFinishedTableEpochs = new 
HashMap<>(this.finishedTableEpochs);
         Map<Long, Long> currentFailedTableEpochs = new 
HashMap<>(this.failedTableEpochs);
         LakeTieringHeartbeatRequest tieringHeartbeatRequest =
@@ -342,6 +347,7 @@ public class TieringSourceEnumerator
 
     @Override
     public void close() throws IOException {
+        closed = true;
         if (rpcClient != null) {
             failedTableEpochs.putAll(tieringTableEpochs);
             tieringTableEpochs.clear();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
index ff69bce92..15bb5d7d0 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/lake/LakeSplitSerializerTest.java
@@ -62,7 +62,7 @@ class LakeSplitSerializerTest {
     void testSerializeAndDeserializeLakeSnapshotSplit() throws IOException {
         // Prepare test data
         LakeSnapshotSplit originalSplit =
-                new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT);
+                new LakeSnapshotSplit(tableBucket, "2025-08-18", LAKE_SPLIT, 
1);
 
         DataOutputSerializer output = new 
DataOutputSerializer(STOPPING_OFFSET);
         serializer.serialize(output, originalSplit);
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
index 4b6db5026..7218f3891 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java
@@ -353,6 +353,7 @@ class FlinkSourceEnumeratorTest extends FlinkTestBase {
                             context,
                             assignedBuckets,
                             Collections.emptyMap(),
+                            Collections.emptyList(),
                             OffsetsInitializer.earliest(),
                             DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
                             streaming,
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
index f5f39b331..6a413f80d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/state/SourceEnumeratorStateSerializerTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -37,7 +38,7 @@ class SourceEnumeratorStateSerializerTest {
     @Test
     void testPendingSplitsCheckpointSerde() throws Exception {
         FlussSourceEnumeratorStateSerializer serializer =
-                FlussSourceEnumeratorStateSerializer.INSTANCE;
+                new FlussSourceEnumeratorStateSerializer(null);
 
         Set<TableBucket> assignedBuckets =
                 new HashSet<>(Arrays.asList(new TableBucket(1, 0), new 
TableBucket(1, 4L, 1)));
@@ -46,7 +47,8 @@ class SourceEnumeratorStateSerializerTest {
         assignedPartitions.put(2L, "partition2");
 
         SourceEnumeratorState sourceEnumeratorState =
-                new SourceEnumeratorState(assignedBuckets, assignedPartitions);
+                new SourceEnumeratorState(
+                        assignedBuckets, assignedPartitions, 
Collections.emptyList());
 
         // serialize assigned buckets
         byte[] serialized = serializer.serialize(sourceEnumeratorState);
diff --git a/fluss-lake/fluss-lake-paimon/pom.xml 
b/fluss-lake/fluss-lake-paimon/pom.xml
index 57b154460..c1ee02f42 100644
--- a/fluss-lake/fluss-lake-paimon/pom.xml
+++ b/fluss-lake/fluss-lake-paimon/pom.xml
@@ -164,6 +164,7 @@
             <artifactId>fluss-flink-common</artifactId>
             <version>${project.version}</version>
             <scope>test</scope>
+            <type>test-jar</type>
         </dependency>
 
         <dependency>
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
index 426969941..0b530a722 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadLogTableITCase.java
@@ -25,6 +25,7 @@ import org.apache.fluss.row.TimestampNtz;
 
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.CollectionUtil;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -36,10 +37,12 @@ import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static 
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
 import static org.apache.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -122,6 +125,50 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+        // first of all, start tiering
+        JobClient jobClient = buildTieringJob(execEnv);
+
+        String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" 
: "non_partitioned");
+
+        TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+        List<Row> writtenRows = new LinkedList<>();
+        long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, 
writtenRows);
+        // wait until records has been synced
+        waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+        // now, start to read the log table, which will read paimon
+        // may read fluss or not, depends on the log offset of paimon snapshot
+        CloseableIterator<Row> actual =
+                streamTEnv.executeSql("select * from " + tableName).collect();
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+        // can database sync job
+        jobClient.cancel().get();
+
+        // write some log data again
+        writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+        // query the log table again and check the data
+        // it should read both paimon snapshot and fluss log
+        actual =
+                streamTEnv
+                        .executeSql(
+                                "select * from "
+                                        + tableName
+                                        + " /*+ 
OPTIONS('scan.partition.discovery.interval'='100ms') */")
+                        .collect();
+        if (isPartitioned) {
+            // we write to a new partition to verify partition discovery
+            writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+        }
+        assertResultsIgnoreOrder(
+                actual, 
writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+    }
+
     private long prepareLogTable(
             TablePath tablePath, int bucketNum, boolean isPartitioned, 
List<Row> flinkRows)
             throws Exception {
@@ -129,7 +176,8 @@ class FlinkUnionReadLogTableITCase extends 
FlinkUnionReadTestBase {
         long t1Id = createFullTypeLogTable(tablePath, bucketNum, 
isPartitioned);
         if (isPartitioned) {
             Map<Long, String> partitionNameById = 
waitUntilPartitions(tablePath);
-            for (String partition : partitionNameById.values()) {
+            for (String partition :
+                    
partitionNameById.values().stream().sorted().collect(Collectors.toList())) {
                 for (int i = 0; i < 3; i++) {
                     flinkRows.addAll(writeFullTypeRows(tablePath, 10, 
partition));
                 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
index 293e48107..1ad885da9 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/flink/FlinkUnionReadTestBase.java
@@ -32,6 +32,7 @@ public class FlinkUnionReadTestBase extends 
FlinkPaimonTieringTestBase {
 
     protected static final int DEFAULT_BUCKET_NUM = 1;
     StreamTableEnvironment batchTEnv;
+    StreamTableEnvironment streamTEnv;
 
     @BeforeAll
     protected static void beforeAll() {
@@ -41,6 +42,24 @@ public class FlinkUnionReadTestBase extends 
FlinkPaimonTieringTestBase {
     @BeforeEach
     public void beforeEach() {
         super.beforeEach();
+        buildBatchTEnv();
+        buildStreamTEnv();
+    }
+
+    private void buildStreamTEnv() {
+        String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+        // create table environment
+        streamTEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inStreamingMode());
+        // crate catalog using sql
+        streamTEnv.executeSql(
+                String.format(
+                        "create catalog %s with ('type' = 'fluss', '%s' = 
'%s')",
+                        CATALOG_NAME, BOOTSTRAP_SERVERS.key(), 
bootstrapServers));
+        streamTEnv.executeSql("use catalog " + CATALOG_NAME);
+        streamTEnv.executeSql("use " + DEFAULT_DB);
+    }
+
+    public void buildBatchTEnv() {
         String bootstrapServers = String.join(",", 
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
         // create table environment
         batchTEnv = StreamTableEnvironment.create(execEnv, 
EnvironmentSettings.inBatchMode());

Reply via email to