This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a4ce595cdc [flink] Support limit the max snapshot count per checkpoint 
(#5653)
a4ce595cdc is described below

commit a4ce595cdc43cbad68b9743663ce237c9c1aed0f
Author: WenjunMin <[email protected]>
AuthorDate: Tue May 27 07:54:08 2025 +0800

    [flink] Support limit the max snapshot count per checkpoint (#5653)
---
 .../generated/flink_connector_configuration.html   |  6 ++
 .../apache/paimon/flink/FlinkConnectorOptions.java |  7 +++
 .../source/ContinuousFileSplitEnumerator.java      | 23 +++++++-
 .../flink/source/ContinuousFileStoreSource.java    |  3 +-
 .../AlignedContinuousFileSplitEnumerator.java      |  6 +-
 .../align/AlignedContinuousFileStoreSource.java    |  3 +-
 .../source/ContinuousFileSplitEnumeratorTest.java  | 66 +++++++++++++++++++++-
 .../AlignedContinuousFileSplitEnumeratorTest.java  |  3 +-
 8 files changed, 110 insertions(+), 7 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 53f6884279..b73ffc9dff 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -140,6 +140,12 @@ under the License.
             <td>Integer</td>
             <td>If scan.infer-parallelism is true, limit the parallelism of 
source through this option.</td>
         </tr>
+        <tr>
+            <td><h5>scan.max-snapshot.count</h5></td>
+            <td style="word-wrap: break-word;">-1</td>
+            <td>Integer</td>
+            <td>The max snapshot count to scan per checkpoint. Not limited 
when it's negative.</td>
+        </tr>
         <tr>
             <td><h5>scan.parallelism</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index a0ff463356..b39e9e57e3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -191,6 +191,13 @@ public class FlinkConnectorOptions {
                             "How many splits should assign to subtask per 
batch in StaticFileStoreSplitEnumerator "
                                     + "to avoid exceed `akka.framesize` 
limit.");
 
+    public static final ConfigOption<Integer> SCAN_MAX_SNAPSHOT_COUNT =
+            key("scan.max-snapshot.count")
+                    .intType()
+                    .defaultValue(-1)
+                    .withDescription(
+                            "The max snapshot count to scan per checkpoint. 
Not limited when it's negative.");
+
     public static final ConfigOption<SplitAssignMode> 
SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE =
             key("scan.split-enumerator.mode")
                     .enumType(SplitAssignMode.class)
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 56e19dcf62..79597d0cd2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -84,6 +84,10 @@ public class ContinuousFileSplitEnumerator
 
     private boolean stopTriggerScan = false;
 
+    private long handledSnapshotCount = 0;
+
+    private final int maxSnapshotCount;
+
     public ContinuousFileSplitEnumerator(
             SplitEnumeratorContext<FileStoreSourceSplit> context,
             Collection<FileStoreSourceSplit> remainSplits,
@@ -92,7 +96,8 @@ public class ContinuousFileSplitEnumerator
             StreamTableScan scan,
             BucketMode bucketMode,
             int splitMaxPerTask,
-            boolean shuffleBucketWithPartition) {
+            boolean shuffleBucketWithPartition,
+            int maxSnapshotCount) {
         checkArgument(discoveryInterval > 0L);
         this.context = checkNotNull(context);
         this.nextSnapshotId = nextSnapshotId;
@@ -107,6 +112,7 @@ public class ContinuousFileSplitEnumerator
 
         this.consumerProgressCalculator =
                 new ConsumerProgressCalculator(context.currentParallelism());
+        this.maxSnapshotCount = maxSnapshotCount;
     }
 
     @VisibleForTesting
@@ -189,6 +195,7 @@ public class ContinuousFileSplitEnumerator
         consumerProgressCalculator
                 .notifyCheckpointComplete(checkpointId)
                 .ifPresent(scan::notifyCheckpointComplete);
+        handledSnapshotCount = 0;
     }
 
     // ------------------------------------------------------------------------
@@ -200,8 +207,22 @@ public class ContinuousFileSplitEnumerator
         if (splitAssigner.numberOfRemainingSplits() >= splitMaxNum) {
             return Optional.empty();
         }
+        if (maxSnapshotCount > 0 && handledSnapshotCount >= maxSnapshotCount) {
+            LOG.debug(
+                    "There is {} in-flight snapshot, pending to scan next 
snapshot.",
+                    handledSnapshotCount);
+            return Optional.empty();
+        }
+
         TableScan.Plan plan = scan.plan();
         Long nextSnapshotId = scan.checkpoint();
+        if (nextSnapshotId != null && !plan.splits().isEmpty()) {
+            if (this.nextSnapshotId == null) {
+                handledSnapshotCount++;
+            } else if (!nextSnapshotId.equals(this.nextSnapshotId)) {
+                handledSnapshotCount++;
+            }
+        }
         return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index db39f90455..879f262bbd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -112,6 +112,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
                 scan,
                 bucketMode,
                 options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
-                
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION));
+                
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
+                options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 4ed4461dd7..857c1b7ab7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -95,7 +95,8 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
             BucketMode bucketMode,
             long alignTimeout,
             int splitPerTaskMax,
-            boolean shuffleBucketWithPartition) {
+            boolean shuffleBucketWithPartition,
+            int maxSnapshotCount) {
         super(
                 context,
                 remainSplits,
@@ -104,7 +105,8 @@ public class AlignedContinuousFileSplitEnumerator extends 
ContinuousFileSplitEnu
                 scan,
                 bucketMode,
                 splitPerTaskMax,
-                shuffleBucketWithPartition);
+                shuffleBucketWithPartition,
+                maxSnapshotCount);
         this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
         this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
         this.nextSnapshotId = nextSnapshotId;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 1b3e7b5b19..8904720c90 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -85,6 +85,7 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
                 bucketMode,
                 
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
                 options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
-                
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION));
+                
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
+                options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 6f05e5dd7d..e3912cc707 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -802,6 +802,56 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
         
Assertions.assertThat(enumerator.splitAssigner.numberOfRemainingSplits()).isEqualTo(15
 * 2);
     }
 
+    @Test
+    public void testEnumeratorSnapshotMax() throws Exception {
+        final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+                getSplitEnumeratorContext(2);
+
+        TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+        StreamTableScan scan = new MockScan(results);
+        ContinuousFileSplitEnumerator enumerator =
+                new Builder()
+                        .setSplitEnumeratorContext(context)
+                        .setInitialSplits(Collections.emptyList())
+                        .setDiscoveryInterval(1)
+                        .setScan(scan)
+                        .withBucketMode(BucketMode.BUCKET_UNAWARE)
+                        .withMaxSnapshotCount(1)
+                        .build();
+        enumerator.start();
+
+        long snapshot = 0;
+        List<DataSplit> splits = new ArrayList<>();
+        // splits 1
+        splits.add(createDataSplit(snapshot++, 0, Collections.emptyList()));
+        results.put(1L, new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(1);
+
+        // splits 2
+        splits = new ArrayList<>();
+        splits.add(createDataSplit(snapshot++, 0, Collections.emptyList()));
+        results.put(2L, new DataFilePlan(splits));
+        context.triggerAllActions();
+
+        // The snapshot 2 is pending to scan.
+        
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(1);
+
+        // consumed splits 1
+        enumerator.handleSplitRequest(0, "test");
+        
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(0);
+        context.triggerAllActions();
+
+        // no new snapshot is scanned, because checkpoint is not completed.
+        
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(0);
+
+        enumerator.notifyCheckpointComplete(1);
+        context.triggerAllActions();
+        
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(1);
+        Assertions.assertThat(enumerator.nextSnapshotId).isEqualTo(3);
+    }
+
     private void triggerCheckpointAndComplete(
             ContinuousFileSplitEnumerator enumerator, long checkpointId) 
throws Exception {
         enumerator.snapshotState(checkpointId);
@@ -850,6 +900,7 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
 
         private StreamTableScan scan;
         private BucketMode bucketMode = BucketMode.HASH_FIXED;
+        private int maxSnapshotCount = -1;
 
         public Builder setSplitEnumeratorContext(
                 SplitEnumeratorContext<FileStoreSourceSplit> context) {
@@ -877,9 +928,22 @@ public class ContinuousFileSplitEnumeratorTest extends 
FileSplitEnumeratorTestBa
             return this;
         }
 
+        public Builder withMaxSnapshotCount(int maxSnapshotCount) {
+            this.maxSnapshotCount = maxSnapshotCount;
+            return this;
+        }
+
         public ContinuousFileSplitEnumerator build() {
             return new ContinuousFileSplitEnumerator(
-                    context, initialSplits, null, discoveryInterval, scan, 
bucketMode, 10, false);
+                    context,
+                    initialSplits,
+                    null,
+                    discoveryInterval,
+                    scan,
+                    bucketMode,
+                    10,
+                    false,
+                    maxSnapshotCount);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index a5edc28040..a3c26ce80d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -253,7 +253,8 @@ public class AlignedContinuousFileSplitEnumeratorTest 
extends FileSplitEnumerato
                     bucketMode,
                     timeout,
                     10,
-                    false);
+                    false,
+                    -1);
         }
     }
 }

Reply via email to