JunRuiLee commented on PR #24736: URL: https://github.com/apache/flink/pull/24736#issuecomment-2184726093
> > > > Thanks, @venkata91, for your contribution! After reviewing this PR, I'm concerned that it entirely removes limit that source parallelism should lower than source jobVertex's max parallelism. And I think the goal of this pr is ensure source parallelism isn't limited by config option execution.batch.adaptive.auto-parallelism.max-parallelism, but still respects the max parallelism of source jobVertex. > > > > WDYT? > > > > > > > > > I think that makes sense. Basically what you're saying is if `source's max parallelism` is determined by the `source` itself which is < `default-source-parallelism` config, we should cap it by the `source computed max parallelism` correct? If so, I agree with that. > > > > > > Yes, that's correct. > > @JunRuiLee Sorry for the late reply. I looked at the code again and it does look to be doing as what we expected. Can you please point me to the corresponding code reference? Sorry for the late response. My point is that the upper bound returned by `computeSourceParallelismUpperBound` should be the minimum of `execution.batch.adaptive.auto-parallelism.default-source-parallelism` and the maximum parallelism of the JobVertex itself, rather than just considering `execution.batch.adaptive.auto-parallelism.default-source-parallelism`. A simple reproducible case is to replace the executeJob method in `AdaptiveBatchSchedulerITCase` with the following code: ``` private void executeJob(Boolean useSourceParallelismInference) throws Exception { final Configuration configuration = createConfiguration(); configuration.set(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING, true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(configuration); env.setRuntimeMode(RuntimeExecutionMode.BATCH); List<SlotSharingGroup> slotSharingGroups = new ArrayList<>(); for (int i = 0; i < 3; ++i) { SlotSharingGroup group = SlotSharingGroup.newBuilder("group" + i) .setCpuCores(1.0) .setTaskHeapMemory(MemorySize.parse("100m")) .build(); slotSharingGroups.add(group); } DataStream<Long> source1; DataStream<Long> source2; if (useSourceParallelismInference) { source1 = env.fromSource( new TestingParallelismInferenceNumberSequenceSource( 0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_1), WatermarkStrategy.noWatermarks(), "source1") .slotSharingGroup(slotSharingGroups.get(0)); source2 = env.fromSource( new TestingParallelismInferenceNumberSequenceSource( 0, NUMBERS_TO_PRODUCE - 1, SOURCE_PARALLELISM_2), WatermarkStrategy.noWatermarks(), "source2") .slotSharingGroup(slotSharingGroups.get(1)); } else { source1 = env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) .setParallelism(-1) .name("source1") .slotSharingGroup(slotSharingGroups.get(0)) .setMaxParallelism(2); source2 = env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) .setParallelism(SOURCE_PARALLELISM_2) .name("source2") .slotSharingGroup(slotSharingGroups.get(1)); } source1.union(source2) .rescale() .map(new NumberCounter()) .name("map") .slotSharingGroup(slotSharingGroups.get(2)); env.execute(); } ``` Then run the `testScheduling` case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org