JunRuiLee commented on PR #24736:
URL: https://github.com/apache/flink/pull/24736#issuecomment-2184726093

   > > > > Thanks, @venkata91, for your contribution! After reviewing this PR, 
I'm concerned that it entirely removes limit that source parallelism should 
lower than source jobVertex's max parallelism. And I think the goal of this pr 
is ensure source parallelism isn't limited by config option 
execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects 
the max parallelism of source jobVertex.
   > > > > WDYT?
   > > > 
   > > > 
   > > > I think that makes sense. Basically what you're saying is if `source's 
max parallelism` is determined by the `source` itself which is < 
`default-source-parallelism` config, we should cap it by the `source computed 
max parallelism` correct? If so, I agree with that.
   > > 
   > > 
   > > Yes, that's correct.
   > 
   > @JunRuiLee Sorry for the late reply. I looked at the code again and it 
does look to be doing as what we expected. Can you please point me to the 
corresponding code reference?
   
   Sorry for the late response. My point is that the upper bound returned by 
`computeSourceParallelismUpperBound` should be the minimum of 
`execution.batch.adaptive.auto-parallelism.default-source-parallelism` and the 
maximum parallelism of the JobVertex itself, rather than just considering 
`execution.batch.adaptive.auto-parallelism.default-source-parallelism`.
   
   A simple reproducible case is to replace the executeJob method in 
`AdaptiveBatchSchedulerITCase` with the following code:
   ```
   private void executeJob(Boolean useSourceParallelismInference) throws 
Exception {
           final Configuration configuration = createConfiguration();
           
configuration.set(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING, true);
   
           final StreamExecutionEnvironment env =
                   
StreamExecutionEnvironment.createLocalEnvironment(configuration);
           env.setRuntimeMode(RuntimeExecutionMode.BATCH);
   
           List<SlotSharingGroup> slotSharingGroups = new ArrayList<>();
   
           for (int i = 0; i < 3; ++i) {
               SlotSharingGroup group =
                       SlotSharingGroup.newBuilder("group" + i)
                               .setCpuCores(1.0)
                               .setTaskHeapMemory(MemorySize.parse("100m"))
                               .build();
               slotSharingGroups.add(group);
           }
   
           DataStream<Long> source1;
           DataStream<Long> source2;
   
           if (useSourceParallelismInference) {
               source1 =
                       env.fromSource(
                                       new 
TestingParallelismInferenceNumberSequenceSource(
                                               0, NUMBERS_TO_PRODUCE - 1, 
SOURCE_PARALLELISM_1),
                                       WatermarkStrategy.noWatermarks(),
                                       "source1")
                               .slotSharingGroup(slotSharingGroups.get(0));
               source2 =
                       env.fromSource(
                                       new 
TestingParallelismInferenceNumberSequenceSource(
                                               0, NUMBERS_TO_PRODUCE - 1, 
SOURCE_PARALLELISM_2),
                                       WatermarkStrategy.noWatermarks(),
                                       "source2")
                               .slotSharingGroup(slotSharingGroups.get(1));
           } else {
               source1 =
                       env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                               .setParallelism(-1)
                               .name("source1")
                               .slotSharingGroup(slotSharingGroups.get(0))
                               .setMaxParallelism(2);
               source2 =
                       env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                               .setParallelism(SOURCE_PARALLELISM_2)
                               .name("source2")
                               .slotSharingGroup(slotSharingGroups.get(1));
           }
   
           source1.union(source2)
                   .rescale()
                   .map(new NumberCounter())
                   .name("map")
                   .slotSharingGroup(slotSharingGroups.get(2));
   
           env.execute();
       }
   ```
   Then run the `testScheduling` case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to