mxm commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1760807867
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -389,15 +407,61 @@ protected static int scale( return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + int numKeyGroupsOrPartitions = maxParallelism; + int upperBoundForAlignment; + if (numPartitions <= 0) { + upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); Review Comment: @1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80. Are you asking to expand the source logic introduced here to hash keyed state? >For this scenario, there is no difference when the parallelism is set to 50 and 99. That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data. In this scenario though, we should not be going down to 50, we probably should be going up to 100. I think the `maxParallelism / 2` stems from the idea that the maximum parallelism won't be reached because it is set to a number `parallelism <= maxParallelism / 2` which would mean that it doesn't make sense to continue beyond maxParallelism/2 because there aren't more possible divisors. However, when `parallelism > maxParallelism / 2`, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to `maxParallelism` for the initial `parallelism > maxParallelism / 2`. We could just skip this (premature) optimization entirely. I agree that we should replace the current key alignment logic with the generalized source logic introduced here. Something like this: ```java final int numKeyGroupsOrPartitions; final int upperBoundForAlignment; if (numSourcePartitions <= 0) { numKeyGroupsOrPartitions = maxParallelism; upperBoundForAlignment = Math.min( // Optimize the case where newParallelism <= maxParallelism / 2 newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, upperBound ); } else { numKeyGroupsOrPartitions = numSourcePartitions; upperBoundForAlignment = Math.min(numSourcePartitions, upperBound); } // When the shuffle type of vertex inputs contains keyBy or vertex is a source, // we try to adjust the parallelism such that it divides // the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks for (int p = newParallelism; p <= upperBoundForAlignment; p++) { if (numKeyGroupsOrPartitions % p == 0) { return p; } } // When adjust the parallelism after rounding up cannot be evenly divided by source // numSourcePartitions, Try to find the smallest parallelism that can satisfy the // current // consumption rate. int p = newParallelism; for (; p > 0; p--) { if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) { if (numKeyGroupsOrPartitions % p != 0) { p++; } break; } } p = Math.max(p, parallelismLowerLimit); return p; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org