This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed40bd2471 [hotfix] Minor refactor for Flink Dedicated Split Gen Source
ed40bd2471 is described below

commit ed40bd247170aa515371be589494e99651b33b36
Author: JingsongLi <[email protected]>
AuthorDate: Thu Jun 12 15:48:33 2025 +0800

    [hotfix] Minor refactor for Flink Dedicated Split Gen Source
---
 docs/content/append-table/query-performance.md          | 11 -----------
 docs/content/flink/sql-query.md                         | 17 +++++++++++++++++
 docs/content/primary-key-table/query-performance.md     | 12 ------------
 .../apache/paimon/flink/source/FlinkSourceBuilder.java  |  7 ++++---
 4 files changed, 21 insertions(+), 26 deletions(-)

diff --git a/docs/content/append-table/query-performance.md 
b/docs/content/append-table/query-performance.md
index d488e4e999..e2128bbd89 100644
--- a/docs/content/append-table/query-performance.md
+++ b/docs/content/append-table/query-performance.md
@@ -75,14 +75,3 @@ we use the procedure, you should config appropriate 
configurations in target tab
 `file-index.<filter-type>.columns` to the table.
 
 How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" 
>}}) 
-
-## Dedicated Split Generation
-When Paimon table snapshots contain large amount of source splits, Flink jobs 
reading from this table might endure long initialization time or even OOM in 
JobManagers. In this case, you can configure `'scan.dedicated-split-generation' 
= 'true'` to avoid such problem. This option would enable executing the source 
split generation process in a dedicated subtask that runs on TaskManager, 
instead of in the source coordinator on the JobManager.
-
-Note that this feature could have some side effects on your Flink jobs. For 
example:
-
-1. It will change the DAG of the flink job, thus breaking checkpoint 
compatibility if enabled on an existing job.
-2. It may lead to the Flink AdaptiveBatchScheduler inferring a small 
parallelism for the source reader operator. you can configure 
`scan.infer-parallelism` to avoid this possible drawback.
-3. The failover strategy of the Flink job would be forced into global failover 
instead of regional failover, given that the dedicated source split generation 
task would be connected to all downstream subtasks.
-
-So please make sure these side effects are acceptable to you before enabling 
it.
diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md
index 85940a36e8..c974527358 100644
--- a/docs/content/flink/sql-query.md
+++ b/docs/content/flink/sql-query.md
@@ -288,3 +288,20 @@ SELECT * FROM orders WHERE order_id=29495;
 
 SELECT * FROM orders WHERE catalog_id=1025 OR order_id=29495;
 ```
+
+## Dedicated Split Generation
+
+When Paimon table snapshots contain large amount of source splits, Flink jobs 
reading from this table might endure long
+initialization time or even OOM in JobManagers. In this case, you can 
configure `'scan.dedicated-split-generation' = 'true'`
+to avoid such problem. This option would enable executing the source split 
generation process in a dedicated subtask
+that runs on TaskManager, instead of in the source coordinator on the 
JobManager.
+
+Note that this feature could have some side effects on your Flink jobs. For 
example:
+
+1. It will change the DAG of the flink job, thus breaking checkpoint 
compatibility if enabled on an existing job.
+2. It may lead to the Flink AdaptiveBatchScheduler inferring a small 
parallelism for the source reader operator. you can
+   configure `scan.infer-parallelism` to avoid this possible drawback.
+3. The failover strategy of the Flink job would be forced into global failover 
instead of regional failover, given that
+   the dedicated source split generation task would be connected to all 
downstream subtasks.
+
+So please make sure these side effects are acceptable to you before enabling 
it.
diff --git a/docs/content/primary-key-table/query-performance.md 
b/docs/content/primary-key-table/query-performance.md
index ed0e357fc6..d27c4868fe 100644
--- a/docs/content/primary-key-table/query-performance.md
+++ b/docs/content/primary-key-table/query-performance.md
@@ -73,15 +73,3 @@ we use the procedure, you should config appropriate 
configurations in target tab
 `file-index.<filter-type>.columns` to the table.
 
 How to invoke: see [flink procedures]({{< ref "flink/procedures#procedures" 
>}}) 
-
-## Dedicated Split Generation
-When Paimon table snapshots contain large amount of source splits, Flink jobs 
reading from this table might endure long initialization time or even OOM in 
JobManagers. In this case, you can configure `'scan.dedicated-split-generation' 
= 'true'` to avoid such problem. This option would enable executing the source 
split generation process in a dedicated subtask that runs on TaskManager, 
instead of in the source coordinator on the JobManager.
-
-Note that this feature could have some side effects on your Flink jobs. For 
example:
-
-1. It will change the DAG of the flink job, thus breaking checkpoint 
compatibility if enabled on an existing job.
-2. It may lead to the Flink AdaptiveBatchScheduler inferring a small 
parallelism for the source reader operator. you can configure 
`scan.infer-parallelism` to avoid this possible drawback.
-3. The failover strategy of the Flink job would be forced into global failover 
instead of regional failover, given that the dedicated source split generation 
task would be connected to all downstream subtasks.
-
-So please make sure these side effects are acceptable to you before enabling 
it.
-
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index ac776b7cd3..44b5790472 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -78,6 +78,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkState;
  * @since 0.8
  */
 public class FlinkSourceBuilder {
+
     private static final String SOURCE_NAME = "Source";
 
     private final Table table;
@@ -295,7 +296,7 @@ public class FlinkSourceBuilder {
 
         if (sourceBounded) {
             if 
(conf.get(FlinkConnectorOptions.SCAN_DEDICATED_SPLIT_GENERATION)) {
-                return buildContinuousStreamOperator(true);
+                return buildDedicatedSplitGenSource(true);
             }
             return buildStaticFileSource();
         }
@@ -328,14 +329,14 @@ public class FlinkSourceBuilder {
             } else if (conf.contains(CoreOptions.CONSUMER_ID)
                     && conf.get(CoreOptions.CONSUMER_CONSISTENCY_MODE)
                             == CoreOptions.ConsumerMode.EXACTLY_ONCE) {
-                return buildContinuousStreamOperator(false);
+                return buildDedicatedSplitGenSource(false);
             } else {
                 return buildContinuousFileSource();
             }
         }
     }
 
-    private DataStream<RowData> buildContinuousStreamOperator(boolean 
isBounded) {
+    private DataStream<RowData> buildDedicatedSplitGenSource(boolean 
isBounded) {
         DataStream<RowData> dataStream;
         if (limit != null && !isBounded) {
             throw new IllegalArgumentException(

Reply via email to