This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.2
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 95322af7480021215efc5c953c301842c08163f1
Author: wangwj <[email protected]>
AuthorDate: Thu Jun 12 15:44:56 2025 +0800

    [flink] Add 'scan.dedicated-split-generation' to Flink Batch Source (#5715)
---
 docs/content/append-table/query-performance.md     | 11 ++++
 .../content/primary-key-table/query-performance.md | 12 +++++
 .../generated/flink_connector_configuration.html   | 18 ++++---
 .../apache/paimon/flink/FlinkConnectorOptions.java | 15 ++++--
 .../postpone/PostponeBucketCompactSplitSource.java |  2 +-
 .../flink/source/ContinuousFileStoreSource.java    |  2 +-
 .../paimon/flink/source/FlinkSourceBuilder.java    | 16 +++---
 .../align/AlignedContinuousFileStoreSource.java    |  2 +-
 .../flink/source/operator/MonitorSource.java       | 38 ++++++++++----
 .../paimon/flink/source/operator/ReadOperator.java | 25 ++++++++-
 .../apache/paimon/flink/utils/TableScanUtils.java  | 16 ++++--
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 38 ++++++++++++++
 .../flink/source/operator/OperatorSourceTest.java  | 60 +++++++++++++++++++---
 13 files changed, 216 insertions(+), 39 deletions(-)

diff --git a/docs/content/append-table/query-performance.md 
b/docs/content/append-table/query-performance.md
index e2128bbd89..d488e4e999 100644
--- a/docs/content/append-table/query-performance.md
+++ b/docs/content/append-table/query-performance.md
@@ -75,3 +75,14 @@ we use the procedure, you should config appropriate 
configurations in target tab
 `file-index.<filter-type>.columns` to the table.
 
 How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" 
>}}) 
+
+## Dedicated Split Generation
+When Paimon table snapshots contain large amount of source splits, Flink jobs 
reading from this table might endure long initialization time or even OOM in 
JobManagers. In this case, you can configure `'scan.dedicated-split-generation' 
= 'true'` to avoid such problem. This option would enable executing the source 
split generation process in a dedicated subtask that runs on TaskManager, 
instead of in the source coordinator on the JobManager.
+
+Note that this feature could have some side effects on your Flink jobs. For 
example:
+
+1. It will change the DAG of the flink job, thus breaking checkpoint 
compatibility if enabled on an existing job.
+2. It may lead to the Flink AdaptiveBatchScheduler inferring a small 
parallelism for the source reader operator. you can configure 
`scan.infer-parallelism` to avoid this possible drawback.
+3. The failover strategy of the Flink job would be forced into global failover 
instead of regional failover, given that the dedicated source split generation 
task would be connected to all downstream subtasks.
+
+So please make sure these side effects are acceptable to you before enabling 
it.
diff --git a/docs/content/primary-key-table/query-performance.md 
b/docs/content/primary-key-table/query-performance.md
index d27c4868fe..ed0e357fc6 100644
--- a/docs/content/primary-key-table/query-performance.md
+++ b/docs/content/primary-key-table/query-performance.md
@@ -73,3 +73,15 @@ we use the procedure, you should config appropriate 
configurations in target tab
 `file-index.<filter-type>.columns` to the table.
 
 How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" 
>}}) 
+
+## Dedicated Split Generation
+When Paimon table snapshots contain large amount of source splits, Flink jobs 
reading from this table might endure long initialization time or even OOM in 
JobManagers. In this case, you can configure `'scan.dedicated-split-generation' 
= 'true'` to avoid such problem. This option would enable executing the source 
split generation process in a dedicated subtask that runs on TaskManager, 
instead of in the source coordinator on the JobManager.
+
+Note that this feature could have some side effects on your Flink jobs. For 
example:
+
+1. It will change the DAG of the flink job, thus breaking checkpoint 
compatibility if enabled on an existing job.
+2. It may lead to the Flink AdaptiveBatchScheduler inferring a small 
parallelism for the source reader operator. you can configure 
`scan.infer-parallelism` to avoid this possible drawback.
+3. The failover strategy of the Flink job would be forced into global failover 
instead of regional failover, given that the dedicated source split generation 
task would be connected to all downstream subtasks.
+
+So please make sure these side effects are acceptable to you before enabling 
it.
+
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 8b80512fc4..1a67d4e60f 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -122,12 +122,24 @@ under the License.
             <td>Boolean</td>
             <td>If true, it will add a compact coordinator and worker operator 
after the writer operator,in order to compact several changelog files (for 
primary key tables) or newly created data files (for unaware bucket tables) 
from the same partition into large ones, which can decrease the number of small 
files.</td>
         </tr>
+        <tr>
+            <td><h5>read.shuffle-bucket-with-partition</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether shuffle by partition and bucket when read.</td>
+        </tr>
         <tr>
             <td><h5>scan.bounded</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Boolean</td>
             <td>Bounded mode for Paimon consumer. By default, Paimon 
automatically selects bounded mode based on the mode of the Flink job.</td>
         </tr>
+        <tr>
+            <td><h5>scan.dedicated-split-generation</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, the split generation process would be performed 
during runtime on a Flink task, instead of on the JobManager during 
initialization phase.</td>
+        </tr>
         <tr>
             <td><h5>scan.infer-parallelism</h5></td>
             <td style="word-wrap: break-word;">true</td>
@@ -314,12 +326,6 @@ under the License.
             <td>String</td>
             <td>Set the uid suffix for the source operators. After setting, 
the uid format is ${UID_PREFIX}_${TABLE_NAME}_${USER_UID_SUFFIX}. If the uid 
suffix is not set, flink will automatically generate the operator uid, which 
may be incompatible when the topology changes.</td>
         </tr>
-        <tr>
-            <td><h5>streaming-read.shuffle-bucket-with-partition</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>Whether shuffle by partition and bucket when streaming 
read.</td>
-        </tr>
         <tr>
             <td><h5>unaware-bucket.compaction.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 9c0e9d6191..ed8d9a7ac8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -224,12 +224,12 @@ public class FlinkConnectorOptions {
                                     + " Note: This is dangerous and is likely 
to cause data errors if downstream"
                                     + " is used to calculate aggregation and 
the input is not complete changelog.");
 
-    public static final ConfigOption<Boolean> 
STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION =
-            key("streaming-read.shuffle-bucket-with-partition")
+    public static final ConfigOption<Boolean> 
READ_SHUFFLE_BUCKET_WITH_PARTITION =
+            key("read.shuffle-bucket-with-partition")
                     .booleanType()
                     .defaultValue(true)
-                    .withDescription(
-                            "Whether shuffle by partition and bucket when 
streaming read.");
+                    
.withFallbackKeys("streaming-read.shuffle-bucket-with-partition")
+                    .withDescription("Whether shuffle by partition and bucket 
when read.");
 
     /**
      * Weight of writer buffer in managed memory, Flink will compute the 
memory size for writer
@@ -508,6 +508,13 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Bucket number for the partitions compacted for 
the first time in postpone bucket tables.");
 
+    public static final ConfigOption<Boolean> SCAN_DEDICATED_SPLIT_GENERATION =
+            key("scan.dedicated-split-generation")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "If true, the split generation process would be 
performed during runtime on a Flink task, instead of on the JobManager during 
initialization phase.");
+
     public static List<ConfigOption<?>> getOptions() {
         final Field[] fields = FlinkConnectorOptions.class.getFields();
         final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
index 53bc7a50a7..833eff8120 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/postpone/PostponeBucketCompactSplitSource.java
@@ -155,7 +155,7 @@ public class PostponeBucketCompactSplitSource extends 
AbstractNonCoordinatedSour
                                         table.fullName(), partitionSpec),
                                 InternalTypeInfo.of(
                                         
LogicalTypeConversion.toLogicalType(table.rowType())),
-                                new ReadOperator(table::newRead, null)),
+                                new ReadOperator(table::newRead, null, null)),
                 source.forward()
                         .transform(
                                 "Remove new files",
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index 3587313748..2c1273e811 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -111,7 +111,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 scan,
                 unawareBucket,
                 options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
-                
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
+                
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
                 options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index feb9176c85..ac776b7cd3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -294,6 +294,9 @@ public class FlinkSourceBuilder {
         }
 
         if (sourceBounded) {
+            if 
(conf.get(FlinkConnectorOptions.SCAN_DEDICATED_SPLIT_GENERATION)) {
+                return buildContinuousStreamOperator(true);
+            }
             return buildStaticFileSource();
         }
         TableScanUtils.streamingReadingValidate(table);
@@ -325,16 +328,16 @@ public class FlinkSourceBuilder {
             } else if (conf.contains(CoreOptions.CONSUMER_ID)
                     && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
                             == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
-                return buildContinuousStreamOperator();
+                return buildContinuousStreamOperator(false);
             } else {
                 return buildContinuousFileSource();
             }
         }
     }
 
-    private DataStream<RowData> buildContinuousStreamOperator() {
+    private DataStream<RowData> buildContinuousStreamOperator(boolean 
isBounded) {
         DataStream<RowData> dataStream;
-        if (limit != null) {
+        if (limit != null && !isBounded) {
             throw new IllegalArgumentException(
                     "Cannot limit streaming source, please use batch execution 
mode.");
         }
@@ -346,10 +349,11 @@ public class FlinkSourceBuilder {
                         createReadBuilder(projectedRowType()),
                         
conf.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis(),
                         watermarkStrategy == null,
-                        conf.get(
-                                
FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
+                        
conf.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
                         unawareBucket,
-                        outerProject());
+                        outerProject(),
+                        isBounded,
+                        limit);
         if (parallelism != null) {
             dataStream.getTransformation().setParallelism(parallelism);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 29ff63a31c..59845901ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -84,7 +84,7 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
                 unawareBucket,
                 
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
                 options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
-                
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
+                
options.get(FlinkConnectorOptions.READ_SHUFFLE_BUCKET_WITH_PARTITION),
                 options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
index 228d0d5a44..d664b526ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorSource.java
@@ -30,6 +30,7 @@ import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
 
 import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
@@ -48,6 +49,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableMap;
@@ -59,20 +62,22 @@ import java.util.TreeMap;
  *
  * <ol>
  *   <li>Monitoring snapshots of the Paimon table.
- *   <li>Creating the {@link Split splits} corresponding to the incremental 
files
+ *   <li>Creating the {@link Split splits} corresponding to the DateFiles
  *   <li>Assigning them to downstream tasks for further processing.
  * </ol>
  *
  * <p>The splits to be read are forwarded to the downstream {@link 
ReadOperator} which can have
  * parallelism greater than one.
  *
- * <p>Currently, there are two features that rely on this monitor:
+ * <p>Currently, there are three features that rely on this monitor:
  *
  * <ol>
  *   <li>Consumer-id: rely on this source to do aligned snapshot consumption, 
and ensure that all
  *       data in a snapshot is consumed within each checkpoint.
  *   <li>Snapshot-watermark: when there is no watermark definition, the 
default Paimon table will
  *       pass the watermark recorded in the snapshot.
+ *   <li>Optimize-coordinator-memory: rely on this source to get splits on a 
single task, this can
+ *       reduce the memory pressure of source coordinator.
  * </ol>
  */
 public class MonitorSource extends AbstractNonCoordinatedSource<Split> {
@@ -84,17 +89,22 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
     private final ReadBuilder readBuilder;
     private final long monitorInterval;
     private final boolean emitSnapshotWatermark;
+    private final boolean isBounded;
 
     public MonitorSource(
-            ReadBuilder readBuilder, long monitorInterval, boolean 
emitSnapshotWatermark) {
+            ReadBuilder readBuilder,
+            long monitorInterval,
+            boolean emitSnapshotWatermark,
+            boolean isBounded) {
         this.readBuilder = readBuilder;
         this.monitorInterval = monitorInterval;
         this.emitSnapshotWatermark = emitSnapshotWatermark;
+        this.isBounded = isBounded;
     }
 
     @Override
     public Boundedness getBoundedness() {
-        return Boundedness.CONTINUOUS_UNBOUNDED;
+        return isBounded ? Boundedness.BOUNDED : 
Boundedness.CONTINUOUS_UNBOUNDED;
     }
 
     @Override
@@ -108,6 +118,7 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
         private static final String NEXT_SNAPSHOT_STATE = "NSS";
 
         private final StreamTableScan scan = readBuilder.newStreamScan();
+        private final TableScan batchScan = readBuilder.newScan();
         private final SplitListState<Long> checkpointState =
                 new SplitListState<>(CHECKPOINT_STATE, x -> Long.toString(x), 
Long::parseLong);
         private final SplitListState<Tuple2<Long, Long>> nextSnapshotState =
@@ -178,11 +189,11 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
         public InputStatus pollNext(ReaderOutput<Split> readerOutput) throws 
Exception {
             boolean isEmpty;
             try {
-                List<Split> splits = scan.plan().splits();
+                List<Split> splits = isBounded ? batchScan.plan().splits() : 
scan.plan().splits();
                 isEmpty = splits.isEmpty();
                 splits.forEach(readerOutput::collect);
 
-                if (emitSnapshotWatermark) {
+                if (emitSnapshotWatermark && !isBounded) {
                     Long watermark = scan.watermark();
                     if (watermark != null) {
                         readerOutput.emitWatermark(new Watermark(watermark));
@@ -193,6 +204,10 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
                 return InputStatus.END_OF_INPUT;
             }
 
+            if (isBounded) {
+                return InputStatus.END_OF_INPUT;
+            }
+
             if (isEmpty) {
                 Thread.sleep(monitorInterval);
             }
@@ -209,11 +224,16 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
             boolean emitSnapshotWatermark,
             boolean shuffleBucketWithPartition,
             boolean unawareBucket,
-            NestedProjectedRowData nestedProjectedRowData) {
+            NestedProjectedRowData nestedProjectedRowData,
+            boolean isBounded,
+            @Nullable Long limit) {
         SingleOutputStreamOperator<Split> singleOutputStreamOperator =
                 env.fromSource(
                                 new MonitorSource(
-                                        readBuilder, monitorInterval, 
emitSnapshotWatermark),
+                                        readBuilder,
+                                        monitorInterval,
+                                        emitSnapshotWatermark,
+                                        isBounded),
                                 WatermarkStrategy.noWatermarks(),
                                 name + "-Monitor",
                                 new JavaTypeInfo<>(Split.class))
@@ -228,7 +248,7 @@ public class MonitorSource extends 
AbstractNonCoordinatedSource<Split> {
         return sourceDataStream.transform(
                 name + "-Reader",
                 typeInfo,
-                new ReadOperator(readBuilder::newRead, 
nestedProjectedRowData));
+                new ReadOperator(readBuilder::newRead, nestedProjectedRowData, 
limit));
     }
 
     private static DataStream<Split> shuffleUnawareBucket(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index 685b324088..a9b9767041 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -36,6 +36,8 @@ import 
org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -47,6 +49,8 @@ import javax.annotation.Nullable;
 public class ReadOperator extends AbstractStreamOperator<RowData>
         implements OneInputStreamOperator<Split, RowData> {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ReadOperator.class);
+
     private static final long serialVersionUID = 2L;
 
     private final SerializableSupplier<TableRead> readSupplier;
@@ -64,12 +68,15 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     private transient long emitEventTimeLag = 
FileStoreSourceReaderMetrics.UNDEFINED;
     private transient long idleStartTime = FileStoreSourceReaderMetrics.ACTIVE;
     private transient Counter numRecordsIn;
+    @Nullable private final Long limit;
 
     public ReadOperator(
             SerializableSupplier<TableRead> readSupplier,
-            @Nullable NestedProjectedRowData nestedProjectedRowData) {
+            @Nullable NestedProjectedRowData nestedProjectedRowData,
+            @Nullable Long limit) {
         this.readSupplier = readSupplier;
         this.nestedProjectedRowData = nestedProjectedRowData;
+        this.limit = limit;
     }
 
     @Override
@@ -98,6 +105,10 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
 
     @Override
     public void processElement(StreamRecord<Split> record) throws Exception {
+        if (reachLimit()) {
+            return;
+        }
+
         Split split = record.getValue();
         // update metric when reading a new split
         long eventTime =
@@ -122,6 +133,10 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                     numRecordsIn.inc();
                 }
 
+                if (reachLimit()) {
+                    return;
+                }
+
                 reuseRow.replace(iterator.next());
                 if (nestedProjectedRowData == null) {
                     reuseRecord.replace(reuseRow);
@@ -144,6 +159,14 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
         }
     }
 
+    private boolean reachLimit() {
+        if (limit != null && numRecordsIn.getCount() > limit) {
+            LOG.info("Reader {} reach the limit record {}.", this, limit);
+            return true;
+        }
+        return false;
+    }
+
     private void idlingStarted() {
         if (!isIdling()) {
             idleStartTime = System.currentTimeMillis();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
index 30b7bbdd5d..ca901ad1bf 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableScanUtils.java
@@ -30,12 +30,14 @@ import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
 
+import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_DEDICATED_SPLIT_GENERATION;
+
 /** Utility methods for {@link TableScan}, such as validating. */
 public class TableScanUtils {
 
     public static void streamingReadingValidate(Table table) {
-        CoreOptions options = CoreOptions.fromMap(table.options());
-        CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
+        CoreOptions coreOptions = CoreOptions.fromMap(table.options());
+        CoreOptions.MergeEngine mergeEngine = coreOptions.mergeEngine();
         HashMap<CoreOptions.MergeEngine, String> mergeEngineDesc =
                 new HashMap<CoreOptions.MergeEngine, String>() {
                     {
@@ -45,7 +47,7 @@ public class TableScanUtils {
                     }
                 };
         if (table.primaryKeys().size() > 0 && 
mergeEngineDesc.containsKey(mergeEngine)) {
-            if (options.changelogProducer() == 
CoreOptions.ChangelogProducer.NONE) {
+            if (coreOptions.changelogProducer() == 
CoreOptions.ChangelogProducer.NONE) {
                 throw new RuntimeException(
                         mergeEngineDesc.get(mergeEngine)
                                 + " streaming reading is not supported. You 
can use "
@@ -53,6 +55,14 @@ public class TableScanUtils {
                                 + "('input' changelog producer is also 
supported, but only returns input records.)");
             }
         }
+
+        Options options = Options.fromMap(table.options());
+        if (options.get(SCAN_DEDICATED_SPLIT_GENERATION)) {
+            throw new RuntimeException(
+                    "The option "
+                            + SCAN_DEDICATED_SPLIT_GENERATION.key()
+                            + " can only used in batch mode.");
+        }
     }
 
     /** Get snapshot id from {@link FileStoreSourceSplit}. */
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 2b846b68bc..e431b69841 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -815,4 +815,42 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(sql("SELECT rowkind, b FROM `test_table$binlog`"))
                 .containsExactly(Row.of("+I", new String[] {"A"}));
     }
+
+    @Test
+    public void testBatchReadSourceWithSnapshot() {
+        batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222), (3, 33, 
333), (4, 44, 444)");
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.snapshot-id'='1', 'scan.dedicated-split-generation'='true') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444));
+
+        batchSql("INSERT INTO T VALUES (5, 55, 555), (6, 66, 666)");
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.dedicated-split-generation'='true') */"))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 11, 111),
+                        Row.of(2, 22, 222),
+                        Row.of(3, 33, 333),
+                        Row.of(4, 44, 444),
+                        Row.of(5, 55, 555),
+                        Row.of(6, 66, 666));
+
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.dedicated-split-generation'='true') */ limit 2"))
+                .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 
222));
+    }
+
+    @Test
+    public void testBatchReadSourceWithoutSnapshot() {
+        assertThat(
+                        batchSql(
+                                "SELECT * FROM T /*+ 
OPTIONS('scan.dedicated-split-generation'='true') */"))
+                .hasSize(0);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
index 0d14b60e0c..e4ab4ec157 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/operator/OperatorSourceTest.java
@@ -122,13 +122,26 @@ public class OperatorSourceTest {
         return result;
     }
 
+    @Test
+    public void testMonitorSourceWhenIsBoundedIsTrue() throws Exception {
+        MonitorSource source = new MonitorSource(table.newReadBuilder(), 10, 
false, true);
+        TestingSourceOperator<Split> operator =
+                (TestingSourceOperator<Split>)
+                        TestingSourceOperator.createTestOperator(
+                                source.createReader(null), 
WatermarkStrategy.noWatermarks(), false);
+        AbstractStreamOperatorTestHarness<Split> testHarness =
+                new AbstractStreamOperatorTestHarness<>(operator, 1, 1, 0);
+        testHarness.open();
+        testReadSplit(operator, () -> 1, 1, 1, 1);
+    }
+
     @Test
     public void testMonitorSource() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         // 1. run first
         OperatorSubtaskState snapshot;
         {
-            MonitorSource source = new MonitorSource(table.newReadBuilder(), 
10, false);
+            MonitorSource source = new MonitorSource(table.newReadBuilder(), 
10, false, false);
             TestingSourceOperator<Split> operator =
                     (TestingSourceOperator<Split>)
                             TestingSourceOperator.createTestOperator(
@@ -143,7 +156,7 @@ public class OperatorSourceTest {
 
         // 2. restore from state
         {
-            MonitorSource sourceCopy1 = new 
MonitorSource(table.newReadBuilder(), 10, false);
+            MonitorSource sourceCopy1 = new 
MonitorSource(table.newReadBuilder(), 10, false, false);
             TestingSourceOperator<Split> operatorCopy1 =
                     (TestingSourceOperator<Split>)
                             TestingSourceOperator.createTestOperator(
@@ -168,7 +181,7 @@ public class OperatorSourceTest {
 
         // 3. restore from consumer id
         {
-            MonitorSource sourceCopy2 = new 
MonitorSource(table.newReadBuilder(), 10, false);
+            MonitorSource sourceCopy2 = new 
MonitorSource(table.newReadBuilder(), 10, false, false);
             TestingSourceOperator<Split> operatorCopy2 =
                     (TestingSourceOperator<Split>)
                             TestingSourceOperator.createTestOperator(
@@ -184,7 +197,8 @@ public class OperatorSourceTest {
 
     @Test
     public void testReadOperator() throws Exception {
-        ReadOperator readOperator = new ReadOperator(() -> 
table.newReadBuilder().newRead(), null);
+        ReadOperator readOperator =
+                new ReadOperator(() -> table.newReadBuilder().newRead(), null, 
null);
         OneInputStreamOperatorTestHarness<Split, RowData> harness =
                 new OneInputStreamOperatorTestHarness<>(readOperator);
         harness.setup(
@@ -204,9 +218,41 @@ public class OperatorSourceTest {
                         new StreamRecord<>(GenericRowData.of(2, 2, 2)));
     }
 
+    @Test
+    public void testReadOperatorWithLimit() throws Exception {
+        ReadOperator readOperator =
+                new ReadOperator(() -> table.newReadBuilder().newRead(), null, 
2L);
+        OneInputStreamOperatorTestHarness<Split, RowData> harness =
+                new OneInputStreamOperatorTestHarness<>(readOperator);
+        harness.setup(
+                InternalSerializers.create(
+                        RowType.of(new IntType(), new IntType(), new 
IntType())));
+        writeToTable(1, 1, 1);
+        writeToTable(2, 2, 2);
+        writeToTable(3, 3, 3);
+        writeToTable(4, 4, 4);
+        List<Split> splits = table.newReadBuilder().newScan().plan().splits();
+        harness.open();
+        for (Split split : splits) {
+            harness.processElement(new StreamRecord<>(split));
+        }
+        ArrayList<Object> values = new ArrayList<>(harness.getOutput());
+
+        // In ReadOperator each Split is already counted as one input record. 
But in this case it
+        // will not happen.
+        // So in this case the result values's size if 3 even if the limit is 
2.
+        // The IT case see 
BatchFileStoreITCase#testBatchReadSourceWithSnapshot.
+        assertThat(values)
+                .containsExactlyInAnyOrder(
+                        new StreamRecord<>(GenericRowData.of(1, 1, 1)),
+                        new StreamRecord<>(GenericRowData.of(2, 2, 2)),
+                        new StreamRecord<>(GenericRowData.of(3, 3, 3)));
+    }
+
     @Test
     public void testReadOperatorMetricsRegisterAndUpdate() throws Exception {
-        ReadOperator readOperator = new ReadOperator(() -> 
table.newReadBuilder().newRead(), null);
+        ReadOperator readOperator =
+                new ReadOperator(() -> table.newReadBuilder().newRead(), null, 
null);
         OneInputStreamOperatorTestHarness<Split, RowData> harness =
                 new OneInputStreamOperatorTestHarness<>(readOperator);
         harness.setup(
@@ -305,6 +351,8 @@ public class OperatorSourceTest {
                     public void emitWatermark(WatermarkEvent watermarkEvent) {}
                 };
 
+        writeToTable(a, b, c);
+
         AtomicBoolean isRunning = new AtomicBoolean(true);
         Thread runner =
                 new Thread(
@@ -320,8 +368,6 @@ public class OperatorSourceTest {
                         });
         runner.start();
 
-        writeToTable(a, b, c);
-
         Split split = queue.poll(1, TimeUnit.MINUTES);
         
assertThat(readSplit(split)).containsExactlyInAnyOrder(Arrays.asList(a, b, c));
 


Reply via email to