mxm commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1760807867


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -389,15 +407,61 @@ protected static int scale(
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        int numKeyGroupsOrPartitions = maxParallelism;
+        int upperBoundForAlignment;
+        if (numPartitions <= 0) {
+            upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);

Review Comment:
   @1996fanrui The code path you are pointing to hasn't been changed by this 
PR. It has merely been refactored. In the scenario of maxParallelism=100 and 
newParallelism=80, the resulting parallelism would always be 80.
   
   Are you asking to expand the source logic introduced here to hash keyed 
state? 
   
   >For this scenario, there is no difference when the parallelism is set to 50 
and 99.
   
   That depends on the amount of state in the key groups and other factors like 
hot keys. 50 could be the same as 99, it could also be much worse. For sources, 
the problem is much more amplified because they usually have pretty evenly 
balanced partitions and there is extra overhead to fetch the partition data.
   
   In this scenario though, we should not be going down to 50, we probably 
should be going up to 100. I think the `maxParallelism / 2` stems from the idea 
that the maximum parallelism won't be reached because it is set to a number 
`parallelism <= maxParallelism / 2` which would mean that it doesn't make sense 
to continue beyond maxParallelism/2 because there aren't more possible 
divisors. However, when `parallelism > maxParallelism / 2`, this logic is 
flawed because maxParallelism itself could be a possible divisor. We should 
really be going up to `maxParallelism` for the initial `parallelism > 
maxParallelism / 2`. We could just skip this (premature) optimization entirely.
   
   I agree that we should replace the current key alignment logic with the 
generalized source logic introduced here. Something like this:
   
   ```java
   final int numKeyGroupsOrPartitions;
   final int upperBoundForAlignment;
   if (numSourcePartitions <= 0) {
       numKeyGroupsOrPartitions = maxParallelism;
       upperBoundForAlignment = Math.min(
           // Optimize the case where newParallelism <= maxParallelism / 2
           newParallelism > maxParallelism / 2 ? maxParallelism : 
maxParallelism / 2, 
           upperBound
       );
   } else {
       numKeyGroupsOrPartitions = numSourcePartitions;
       upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
   }
   
   // When the shuffle type of vertex inputs contains keyBy or vertex is a 
source,
   // we try to adjust the parallelism such that it divides
   // the adjustableMaxParallelism without a remainder => data is evenly spread 
across subtasks
   for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
       if (numKeyGroupsOrPartitions % p == 0) {
           return p;
       }
   }
   
   // When adjust the parallelism after rounding up cannot be evenly divided by 
source
   // numSourcePartitions, Try to find the smallest parallelism that can 
satisfy the
   // current
   // consumption rate.
   int p = newParallelism;
   for (; p > 0; p--) {
       if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / 
newParallelism) {
           if (numKeyGroupsOrPartitions % p != 0) {
               p++;
           }
           break;
       }
   }
   
   p = Math.max(p, parallelismLowerLimit);
   return p;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to