This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 94a416737c [flink] ContinuousFileSplitEnumerator supports setting
splitMaxPerTask for SplitAssigner. (#5835)
94a416737c is described below
commit 94a416737c875716941730a479195697b4bac50d
Author: zhuanshenbsj1 <[email protected]>
AuthorDate: Fri Jul 4 20:47:04 2025 +0800
[flink] ContinuousFileSplitEnumerator supports setting splitMaxPerTask for
SplitAssigner. (#5835)
---
.../source/ContinuousFileSplitEnumerator.java | 8 +++--
.../source/ContinuousFileSplitEnumeratorTest.java | 41 ++++++++++++++++------
2 files changed, 37 insertions(+), 12 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
index 4db86da117..38c593e75d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java
@@ -69,6 +69,8 @@ public class ContinuousFileSplitEnumerator
protected final StreamTableScan scan;
+ protected final int splitMaxPerTask;
+
protected final SplitAssigner splitAssigner;
protected final ConsumerProgressCalculator consumerProgressCalculator;
@@ -104,8 +106,9 @@ public class ContinuousFileSplitEnumerator
this.readersAwaitingSplit = new LinkedHashSet<>();
this.splitGenerator = new FileStoreSourceSplitGenerator();
this.scan = scan;
- this.splitAssigner = createSplitAssigner(unawareBucket);
+ this.splitMaxPerTask = splitMaxPerTask;
this.splitMaxNum = context.currentParallelism() * splitMaxPerTask;
+ this.splitAssigner = createSplitAssigner(unawareBucket);
this.shuffleBucketWithPartition = shuffleBucketWithPartition;
addSplits(remainSplits);
@@ -311,7 +314,8 @@ public class ContinuousFileSplitEnumerator
protected SplitAssigner createSplitAssigner(boolean unawareBucket) {
return unawareBucket
? new FIFOSplitAssigner(Collections.emptyList())
- : new PreAssignSplitAssigner(1, context,
Collections.emptyList());
+ : new PreAssignSplitAssigner(
+ this.splitMaxPerTask, context,
Collections.emptyList());
}
protected boolean noMoreSplits() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
index 05e3d3b0b6..90c35350eb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java
@@ -74,6 +74,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setSplitEnumeratorContext(context)
.setInitialSplits(initialSplits)
.setDiscoveryInterval(3)
+ .withSplitMaxPerTask(1)
.build();
// The first time split is allocated, split1 and split2 should be
allocated
@@ -125,6 +126,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setSplitEnumeratorContext(context)
.setInitialSplits(initialSplits)
.setDiscoveryInterval(3)
+ .withSplitMaxPerTask(1)
.build();
// The first time split is allocated, split1 and split2 should be
allocated
@@ -166,6 +168,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setSplitEnumeratorContext(context)
.setInitialSplits(initialSplits)
.setDiscoveryInterval(3)
+ .withSplitMaxPerTask(1)
.build();
// each time a split is allocated from bucket-0 and bucket-1
@@ -205,6 +208,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.build();
enumerator.start();
@@ -231,11 +235,17 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
assertThat(toDataSplits(assignments.get(0).getAssignedSplits()))
.containsExactly(splits.get(0), splits.get(2));
+ // assign to task 1
+ enumerator.handleSplitRequest(1, "test-host");
+ assignments = context.getSplitAssignments();
+ assertThat(assignments).containsKey(1);
+ assertThat(toDataSplits(assignments.get(1).getAssignedSplits()))
+ .containsExactly(splits.get(1));
+
// no more splits task 0
enumerator.handleSplitRequest(0, "test-host");
context.triggerAllActions();
assignments = context.getSplitAssignments();
- assertThat(assignments).containsOnlyKeys(0);
assertThat(assignments.get(0).hasReceivedNoMoreSplitsSignal()).isTrue();
assignments.clear();
@@ -244,14 +254,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
assignments = context.getSplitAssignments();
assertThat(assignments).containsOnlyKeys(1);
assertThat(toDataSplits(assignments.get(1).getAssignedSplits()))
- .containsExactly(splits.get(1));
-
- // assign to task 1
- enumerator.handleSplitRequest(1, "test-host");
- assignments = context.getSplitAssignments();
- assertThat(assignments).containsOnlyKeys(1);
- assertThat(toDataSplits(assignments.get(1).getAssignedSplits()))
- .containsExactly(splits.get(1), splits.get(3));
+ .containsExactly(splits.get(3));
// no more splits task 1
enumerator.handleSplitRequest(1, "test-host");
@@ -273,6 +276,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.unawareBucket(true)
.build();
enumerator.start();
@@ -315,6 +319,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.unawareBucket(true)
.build();
enumerator.start();
@@ -374,6 +379,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.unawareBucket(true)
.build();
enumerator.start();
@@ -430,6 +436,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.unawareBucket(true)
.build();
enumerator.start();
@@ -469,6 +476,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.unawareBucket(true)
.build();
enumerator.start();
@@ -501,6 +509,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.build();
enumerator.start();
@@ -542,6 +551,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.build();
enumerator.start();
enumerator.handleSplitRequest(0, "test-host");
@@ -641,6 +651,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(1)
.build();
enumerator.start();
@@ -709,6 +720,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setSplitEnumeratorContext(context)
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
+ .withSplitMaxPerTask(1)
.setScan(scan)
.build();
enumerator.start();
@@ -765,6 +777,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setInitialSplits(Collections.emptyList())
.setDiscoveryInterval(1)
.setScan(scan)
+ .withSplitMaxPerTask(10)
.unawareBucket(true)
.build();
enumerator.start();
@@ -816,6 +829,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
.setScan(scan)
.unawareBucket(true)
.withMaxSnapshotCount(1)
+ .withSplitMaxPerTask(1)
.build();
enumerator.start();
@@ -901,6 +915,8 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
private boolean unawareBucket = false;
private int maxSnapshotCount = -1;
+ private int splitMaxPerTask = 10;
+
public Builder setSplitEnumeratorContext(
SplitEnumeratorContext<FileStoreSourceSplit> context) {
this.context = context;
@@ -932,6 +948,11 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
return this;
}
+ public Builder withSplitMaxPerTask(int splitMaxPerTask) {
+ this.splitMaxPerTask = splitMaxPerTask;
+ return this;
+ }
+
public ContinuousFileSplitEnumerator build() {
return new ContinuousFileSplitEnumerator(
context,
@@ -940,7 +961,7 @@ public class ContinuousFileSplitEnumeratorTest extends
FileSplitEnumeratorTestBa
discoveryInterval,
scan,
unawareBucket,
- 10,
+ this.splitMaxPerTask,
false,
maxSnapshotCount);
}