This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a4ce595cdc [flink] Support limit the max snapshot count per checkpoint
(#5653)
a4ce595cdc is described below
commit a4ce595cdc43cbad68b9743663ce237c9c1aed0f
Author: WenjunMin <[email protected]>
AuthorDate: Tue May 27 07:54:08 2025 +0800
[flink] Support limit the max snapshot count per checkpoint (#5653)
---
.../generated/flink_connector_configuration.html | 6 ++
.../apache/paimon/flink/FlinkConnectorOptions.java | 7 +++
.../source/ContinuousFileSplitEnumerator.java | 23 +++++++-
.../flink/source/ContinuousFileStoreSource.java | 3 +-
.../AlignedContinuousFileSplitEnumerator.java | 6 +-
.../align/AlignedContinuousFileStoreSource.java | 3 +-
.../source/ContinuousFileSplitEnumeratorTest.java | 66 +++++++++++++++++++++-
.../AlignedContinuousFileSplitEnumeratorTest.java | 3 +-
8 files changed, 110 insertions(+), 7 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index 53f6884279..b73ffc9dff 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -140,6 +140,12 @@ under the License.
<td>Integer</td>
<td>If scan.infer-parallelism is true, limit the parallelism of
source through this option.</td>
</tr>
+ <tr>
+ <td><h5>scan.max-snapshot.count</h5></td>
+ <td style="word-wrap: break-word;">-1</td>
+ <td>Integer</td>
+ <td>The max snapshot count to scan per checkpoint. Not limited
when it's negative.</td>
+ </tr>
<tr>
<td><h5>scan.parallelism</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index a0ff463356..b39e9e57e3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -191,6 +191,13 @@ public class FlinkConnectorOptions {
"How many splits should assign to subtask per
batch in StaticFileStoreSplitEnumerator "
+ "to avoid exceed `akka.framesize`
limit.");
+ public static final ConfigOption<Integer> SCAN_MAX_SNAPSHOT_COUNT =
+ key("scan.max-snapshot.count")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "The max snapshot count to scan per checkpoint.
Not limited when it's negative.");
+
public static final ConfigOption<SplitAssignMode>
SCAN_SPLIT_ENUMERATOR_ASSIGN_MODE =
key("scan.split-enumerator.mode")
.enumType(SplitAssignMode.class)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 56e19dcf62..79597d0cd2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -84,6 +84,10 @@ public class ContinuousFileSplitEnumerator
private boolean stopTriggerScan = false;
+ private long handledSnapshotCount = 0;
+
+ private final int maxSnapshotCount;
+
public ContinuousFileSplitEnumerator(
SplitEnumeratorContext<FileStoreSourceSplit> context,
Collection<FileStoreSourceSplit> remainSplits,
@@ -92,7 +96,8 @@ public class ContinuousFileSplitEnumerator
StreamTableScan scan,
BucketMode bucketMode,
int splitMaxPerTask,
- boolean shuffleBucketWithPartition) {
+ boolean shuffleBucketWithPartition,
+ int maxSnapshotCount) {
checkArgument(discoveryInterval > 0L);
this.context = checkNotNull(context);
this.nextSnapshotId = nextSnapshotId;
@@ -107,6 +112,7 @@ public class ContinuousFileSplitEnumerator
this.consumerProgressCalculator =
new ConsumerProgressCalculator(context.currentParallelism());
+ this.maxSnapshotCount = maxSnapshotCount;
}
@VisibleForTesting
@@ -189,6 +195,7 @@ public class ContinuousFileSplitEnumerator
consumerProgressCalculator
.notifyCheckpointComplete(checkpointId)
.ifPresent(scan::notifyCheckpointComplete);
+ handledSnapshotCount = 0;
}
// ------------------------------------------------------------------------
@@ -200,8 +207,22 @@ public class ContinuousFileSplitEnumerator
if (splitAssigner.numberOfRemainingSplits() >= splitMaxNum) {
return Optional.empty();
}
+ if (maxSnapshotCount > 0 && handledSnapshotCount >= maxSnapshotCount) {
+ LOG.debug(
+ "There is {} in-flight snapshot, pending to scan next
snapshot.",
+ handledSnapshotCount);
+ return Optional.empty();
+ }
+
TableScan.Plan plan = scan.plan();
Long nextSnapshotId = scan.checkpoint();
+ if (nextSnapshotId != null && !plan.splits().isEmpty()) {
+ if (this.nextSnapshotId == null) {
+ handledSnapshotCount++;
+ } else if (!nextSnapshotId.equals(this.nextSnapshotId)) {
+ handledSnapshotCount++;
+ }
+ }
return Optional.of(new PlanWithNextSnapshotId(plan, nextSnapshotId));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
index db39f90455..879f262bbd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileStoreSource.java
@@ -112,6 +112,7 @@ public class ContinuousFileStoreSource extends FlinkSource {
scan,
bucketMode,
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
-
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION));
+
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
+ options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
index 4ed4461dd7..857c1b7ab7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumerator.java
@@ -95,7 +95,8 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
BucketMode bucketMode,
long alignTimeout,
int splitPerTaskMax,
- boolean shuffleBucketWithPartition) {
+ boolean shuffleBucketWithPartition,
+ int maxSnapshotCount) {
super(
context,
remainSplits,
@@ -104,7 +105,8 @@ public class AlignedContinuousFileSplitEnumerator extends
ContinuousFileSplitEnu
scan,
bucketMode,
splitPerTaskMax,
- shuffleBucketWithPartition);
+ shuffleBucketWithPartition,
+ maxSnapshotCount);
this.pendingPlans = new ArrayBlockingQueue<>(MAX_PENDING_PLAN);
this.alignedAssigner = (AlignedSplitAssigner) super.splitAssigner;
this.nextSnapshotId = nextSnapshotId;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index 1b3e7b5b19..8904720c90 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -85,6 +85,7 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
bucketMode,
options.get(FlinkConnectorOptions.SOURCE_CHECKPOINT_ALIGN_TIMEOUT).toMillis(),
options.get(CoreOptions.SCAN_MAX_SPLITS_PER_TASK),
-
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION));
+
options.get(FlinkConnectorOptions.STREAMING_READ_SHUFFLE_BUCKET_WITH_PARTITION),
+ options.get(FlinkConnectorOptions.SCAN_MAX_SNAPSHOT_COUNT));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 6f05e5dd7d..e3912cc707 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -802,6 +802,56 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
Assertions.assertThat(enumerator.splitAssigner.numberOfRemainingSplits()).isEqualTo(15
* 2);
}
+ @Test
+ public void testEnumeratorSnapshotMax() throws Exception {
+ final TestingSplitEnumeratorContext<FileStoreSourceSplit> context =
+ getSplitEnumeratorContext(2);
+
+ TreeMap<Long, TableScan.Plan> results = new TreeMap<>();
+ StreamTableScan scan = new MockScan(results);
+ ContinuousFileSplitEnumerator enumerator =
+ new Builder()
+ .setSplitEnumeratorContext(context)
+ .setInitialSplits(Collections.emptyList())
+ .setDiscoveryInterval(1)
+ .setScan(scan)
+ .withBucketMode(BucketMode.BUCKET_UNAWARE)
+ .withMaxSnapshotCount(1)
+ .build();
+ enumerator.start();
+
+ long snapshot = 0;
+ List<DataSplit> splits = new ArrayList<>();
+ // splits 1
+ splits.add(createDataSplit(snapshot++, 0, Collections.emptyList()));
+ results.put(1L, new DataFilePlan(splits));
+ context.triggerAllActions();
+
+
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(1);
+
+ // splits 2
+ splits = new ArrayList<>();
+ splits.add(createDataSplit(snapshot++, 0, Collections.emptyList()));
+ results.put(2L, new DataFilePlan(splits));
+ context.triggerAllActions();
+
+ // The snapshot 2 is pending to scan.
+
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(1);
+
+ // consumed splits 1
+ enumerator.handleSplitRequest(0, "test");
+
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(0);
+ context.triggerAllActions();
+
+ // no new snapshot is scanned, because checkpoint is not completed.
+
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(0);
+
+ enumerator.notifyCheckpointComplete(1);
+ context.triggerAllActions();
+
Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(1);
+ Assertions.assertThat(enumerator.nextSnapshotId).isEqualTo(3);
+ }
+
private void triggerCheckpointAndComplete(
ContinuousFileSplitEnumerator enumerator, long checkpointId)
throws Exception {
enumerator.snapshotState(checkpointId);
@@ -850,6 +900,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
private StreamTableScan scan;
private BucketMode bucketMode = BucketMode.HASH_FIXED;
+ private int maxSnapshotCount = -1;
public Builder setSplitEnumeratorContext(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
@@ -877,9 +928,22 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
return this;
}
+ public Builder withMaxSnapshotCount(int maxSnapshotCount) {
+ this.maxSnapshotCount = maxSnapshotCount;
+ return this;
+ }
+
public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
- context, initialSplits, null, discoveryInterval, scan,
bucketMode, 10, false);
+ context,
+ initialSplits,
+ null,
+ discoveryInterval,
+ scan,
+ bucketMode,
+ 10,
+ false,
+ maxSnapshotCount);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
index a5edc28040..a3c26ce80d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedContinuousFileSplitEnumeratorTest.java
@@ -253,7 +253,8 @@ public class AlignedContinuousFileSplitEnumeratorTest
extends FileSplitEnumerato
bucketMode,
timeout,
10,
- false);
+ false,
+ -1);
}
}
}