1996fanrui commented on code in PR #27635:
URL: https://github.com/apache/flink/pull/27635#discussion_r2837884399


##########
flink-core/src/main/java/org/apache/flink/api/connector/source/lib/NumberSequenceSource.java:
##########
@@ -116,7 +116,7 @@ public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>> cre
             final SplitEnumeratorContext<NumberSequenceSplit> enumContext) {
 
         final List<NumberSequenceSplit> splits =
-                splitNumberRange(from, to, enumContext.currentParallelism());
+                splitNumberRange(from, to, 10 /* 
enumContext.currentParallelism() */);

Review Comment:
   Thanks for the analysis. I reproduced and investigated it further.
   
   The root cause is: on restore, the available splits come from the checkpoint 
state, not from the new parallelism. For example, first run with parallelism 3 
→ `NumberSequenceSource` creates 3 splits, all assigned immediately, checkpoint 
saves 0 remaining splits. Restore with parallelism 7 → `restoreEnumerator()` 
gets 0 splits from checkpoint, so 4 extra subtasks get `NoMoreSplits` and 
finish.
   
   I like your Option 3 that mentioned in 
https://issues.apache.org/jira/browse/FLINK-38613, but scoped to the test only 
— no production code changes. Subclassed `DataGeneratorSource` and overrode 
`createEnumerator()` to always create at least `MAX_SLOTS` splits:
   
   ```java
   private static class TestDataGeneratorSource extends 
DataGeneratorSource<Long> {
       TestDataGeneratorSource() {
           super(index -> index, Long.MAX_VALUE, 
RateLimiterStrategy.perSecond(5000), Types.LONG);
       }
   
       @Override
       public SplitEnumerator<NumberSequenceSplit, 
Collection<NumberSequenceSplit>>
               createEnumerator(SplitEnumeratorContext<NumberSequenceSplit> 
enumContext) {
           NumberSequenceSource source =
                   new NumberSequenceSource(0, Long.MAX_VALUE - 1) {
                       @Override
                       protected List<NumberSequenceSplit> splitNumberRange(
                               long from, long to, int numSplits) {
                           return super.splitNumberRange(
                                   from, to, Math.max(numSplits, MAX_SLOTS));
                       }
                   };
           return source.createEnumerator(enumContext);
       }
   }
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to