noorall commented on code in PR #25552:
URL: https://github.com/apache/flink/pull/25552#discussion_r1901523524
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -126,52 +136,101 @@ public ParallelismAndInputInfos
decideParallelismAndInputInfosForVertex(
? vertexInitialParallelism
: computeSourceParallelismUpperBound(jobVertexId,
vertexMaxParallelism);
return new ParallelismAndInputInfos(parallelism,
Collections.emptyMap());
- } else {
- int minParallelism = Math.max(globalMinParallelism,
vertexMinParallelism);
- int maxParallelism = globalMaxParallelism;
-
- if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
- && vertexMaxParallelism < minParallelism) {
- LOG.info(
- "The vertex maximum parallelism {} is smaller than the
minimum parallelism {}. "
- + "Use {} as the lower bound to decide
parallelism of job vertex {}.",
- vertexMaxParallelism,
- minParallelism,
- vertexMaxParallelism,
- jobVertexId);
- minParallelism = vertexMaxParallelism;
- }
- if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
- && vertexMaxParallelism < maxParallelism) {
- LOG.info(
- "The vertex maximum parallelism {} is smaller than the
global maximum parallelism {}. "
- + "Use {} as the upper bound to decide
parallelism of job vertex {}.",
- vertexMaxParallelism,
- maxParallelism,
- vertexMaxParallelism,
- jobVertexId);
- maxParallelism = vertexMaxParallelism;
- }
- checkState(maxParallelism >= minParallelism);
-
- if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
- && areAllInputsAllToAll(consumedResults)
- && !areAllInputsBroadcast(consumedResults)) {
- return decideParallelismAndEvenlyDistributeData(
- jobVertexId,
- consumedResults,
- vertexInitialParallelism,
- minParallelism,
- maxParallelism);
- } else {
- return decideParallelismAndEvenlyDistributeSubpartitions(
- jobVertexId,
- consumedResults,
- vertexInitialParallelism,
- minParallelism,
- maxParallelism);
+ }
+
+ int minParallelism = Math.max(globalMinParallelism,
vertexMinParallelism);
+ int maxParallelism = globalMaxParallelism;
+
+ if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+ && vertexMaxParallelism < minParallelism) {
+ LOG.info(
+ "The vertex maximum parallelism {} is smaller than the
minimum parallelism {}. "
+ + "Use {} as the lower bound to decide parallelism
of job vertex {}.",
+ vertexMaxParallelism,
+ minParallelism,
+ vertexMaxParallelism,
+ jobVertexId);
+ minParallelism = vertexMaxParallelism;
+ }
+ if (vertexInitialParallelism == ExecutionConfig.PARALLELISM_DEFAULT
+ && vertexMaxParallelism < maxParallelism) {
+ LOG.info(
+ "The vertex maximum parallelism {} is smaller than the
global maximum parallelism {}. "
+ + "Use {} as the upper bound to decide parallelism
of job vertex {}.",
+ vertexMaxParallelism,
+ maxParallelism,
+ vertexMaxParallelism,
+ jobVertexId);
+ maxParallelism = vertexMaxParallelism;
+ }
+ checkState(maxParallelism >= minParallelism);
+
+ int parallelism =
+ vertexInitialParallelism > 0
+ ? vertexInitialParallelism
+ : decideParallelism(
+ jobVertexId, consumedResults, minParallelism,
maxParallelism);
+
+ Map<Boolean, List<BlockingInputInfo>> inputsGroupByInterCorrelation =
+ consumedResults.stream()
+ .collect(
+ Collectors.groupingBy(
+
BlockingInputInfo::existInterInputsKeyCorrelation));
+
+ // For AllToAll like inputs, we derive parallelism as a whole, while
for Pointwise inputs,
+ // we need to derive parallelism separately for each input.
+ //
+ // In the following cases, we need to reset min parallelism and max
parallelism to ensure
+ // that the decide parallelism for all inputs is consistent :
+ // 1. Vertex has a specified parallelism
+ // 2. There are edges that don't need to follow intergroup constraint
Review Comment:
> Why?
If the vertex specifies a parallelism, we should follow it;
If there is a pointwise type (with inter set to false), since its
parallelism deduction is carried out on a one-by-one basis for individual
inputs, we need to pre-determine the parallelism to ensure that the parallelism
for all inputs is consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]