This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 14f04033 [FLINK-39299][flink-autoscaler] Inconsistent vertex 
parallelism alignment logic
14f04033 is described below

commit 14f04033fc18fbde91bca14b6d38bbdee4ad3722
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Thu Apr 2 16:15:06 2026 +0300

    [FLINK-39299][flink-autoscaler] Inconsistent vertex parallelism alignment 
logic
---
 .../apache/flink/autoscaler/JobVertexScaler.java   | 115 +++++++++++++++++----
 .../flink/autoscaler/JobAutoScalerImplTest.java    |   6 +-
 .../flink/autoscaler/JobVertexScalerTest.java      |   6 +-
 3 files changed, 103 insertions(+), 24 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 492615f4..dfad27ad 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -221,8 +221,8 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                         (int) 
evaluatedMetrics.get(NUM_SOURCE_PARTITIONS).getCurrent(),
                         (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
                         scaleFactor,
-                        Math.min(currentParallelism, 
conf.getInteger(VERTEX_MIN_PARALLELISM)),
-                        Math.max(currentParallelism, 
conf.getInteger(VERTEX_MAX_PARALLELISM)),
+                        Math.min(currentParallelism, 
conf.get(VERTEX_MIN_PARALLELISM)),
+                        Math.max(currentParallelism, 
conf.get(VERTEX_MAX_PARALLELISM)),
                         autoScalerEventHandler,
                         context);
 
@@ -483,16 +483,28 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     }
 
     /**
-     * Computing the newParallelism. In general, newParallelism = 
currentParallelism * scaleFactor.
-     * But we limit newParallelism between parallelismLowerLimit and 
min(parallelismUpperLimit,
-     * maxParallelism).
+     * Computes the new parallelism for a vertex. In general, {@code 
newParallelism =
+     * currentParallelism * scaleFactor}, clamped to {@code 
[parallelismLowerLimit,
+     * min(parallelismUpperLimit, maxParallelism)]}.
      *
-     * <p>Also, in order to ensure the data is evenly spread across subtasks, 
we try to adjust the
-     * parallelism for source and keyed vertex such that it divides the 
maxParallelism without a
-     * remainder.
+     * <p>For source vertices (with known partition counts) and keyed vertices 
(with HASH inputs),
+     * the parallelism is further adjusted to align with the number of key 
groups or source
+     * partitions, according to the configured {@link 
KeyGroupOrPartitionsAdjustMode}.
      *
-     * <p>This method also attempts to adjust the parallelism to ensure it 
aligns well with the
-     * number of source partitions if a vertex has a known source partition 
count.
+     * <p>The alignment uses a two-phase algorithm:
+     *
+     * <ol>
+     *   <li><b>Phase 1 (upward search):</b> searches from the computed {@code 
newParallelism}
+     *       upward for divisor-aligned values. For scale-down, the search is 
capped at {@code
+     *       currentParallelism} to prevent the alignment from crossing into 
the scale-up direction.
+     *       If the nearest divisor is {@code currentParallelism} itself, it 
is returned
+     *       (effectively blocking the scale-down).
+     *   <li><b>Phase 2 (downward fallback):</b> if Phase 1 finds no match, 
searches downward from
+     *       {@code newParallelism} for the boundary where per-subtask load 
increases. A
+     *       direction-safety guard ensures the result never inverts the 
intended scaling direction
+     *       (scale-up results stay above {@code currentParallelism}, 
scale-down results stay
+     *       below).
+     * </ol>
      */
     @VisibleForTesting
     protected static <KEY, Context extends JobAutoScalerContext<KEY>> int 
scale(
@@ -548,10 +560,23 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         KeyGroupOrPartitionsAdjustMode mode =
                 
context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
-        // we try to adjust the parallelism such that it divides
-        // the numKeyGroupsOrPartitions without a remainder => data is evenly 
spread across subtasks
-        for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
+        if (newParallelism == currentParallelism) {
+            return newParallelism;
+        }
+
+        boolean isScaleUp = newParallelism > currentParallelism;
+
+        // Phase 1: Upward search from newParallelism for divisor or 
approximate match.
+        // For scale-down, cap search at currentParallelism to prevent the 
alignment
+        // from crossing into the scale-up direction (direction-safety fix).
+        // Returning currentParallelism itself is allowed — it effectively 
blocks the
+        // scale-down since no aligned value was found strictly below it.
+        int phase1UpperBound =
+                isScaleUp
+                        ? upperBoundForAlignment
+                        : Math.min(currentParallelism, upperBoundForAlignment);
+
+        for (int p = newParallelism; p <= phase1UpperBound; p++) {
             if (numKeyGroupsOrPartitions % p == 0
                     ||
                     // When Mode is MAXIMIZE_UTILISATION , Try to find the 
smallest parallelism
@@ -575,14 +600,67 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 break;
             }
         }
-
         p = Math.max(p, parallelismLowerLimit);
+
+        // Direction-safety guard: prevent the alignment from inverting the 
scaling direction.
+        if ((isScaleUp && p <= currentParallelism) || (!isScaleUp && p >= 
currentParallelism)) {
+            LOG.warn(
+                    "Scaling of {} blocked: alignment could not find a value 
that preserves the "
+                            + "{} direction from current parallelism {} with 
mode {}. "
+                            + "Keeping current parallelism.",
+                    vertex,
+                    isScaleUp ? "scale-up" : "scale-down",
+                    currentParallelism,
+                    mode);
+            emitScalingLimitedEvent(
+                    vertex,
+                    newParallelism,
+                    currentParallelism,
+                    numKeyGroupsOrPartitions,
+                    upperBound,
+                    parallelismLowerLimit,
+                    eventHandler,
+                    context);
+            return currentParallelism;
+        }
+
+        if (p != newParallelism) {
+            emitScalingLimitedEvent(
+                    vertex,
+                    newParallelism,
+                    p,
+                    numKeyGroupsOrPartitions,
+                    upperBound,
+                    parallelismLowerLimit,
+                    eventHandler,
+                    context);
+        }
+        return p;
+    }
+
+    /**
+     * Emits a {@link #SCALING_LIMITED} warning event when the alignment logic 
had to deviate from
+     * the computed target parallelism, or when no aligned parallelism could 
be found and the
+     * scaling operation was blocked.
+     *
+     * @param expectedParallelism the originally computed target parallelism
+     * @param actualParallelism the parallelism that will actually be applied
+     */
+    private static <KEY, Context extends JobAutoScalerContext<KEY>> void 
emitScalingLimitedEvent(
+            JobVertexID vertex,
+            int expectedParallelism,
+            int actualParallelism,
+            int numKeyGroupsOrPartitions,
+            int upperBound,
+            int parallelismLowerLimit,
+            AutoScalerEventHandler<KEY, Context> eventHandler,
+            Context context) {
         var message =
                 String.format(
                         SCALE_LIMITED_MESSAGE_FORMAT,
                         vertex,
-                        newParallelism,
-                        p,
+                        expectedParallelism,
+                        actualParallelism,
                         numKeyGroupsOrPartitions,
                         upperBound,
                         parallelismLowerLimit);
@@ -591,9 +669,8 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 AutoScalerEventHandler.Type.Warning,
                 SCALING_LIMITED,
                 message,
-                SCALING_LIMITED + vertex + newParallelism,
+                SCALING_LIMITED + vertex,
                 context.getConfiguration().get(SCALING_EVENT_INTERVAL));
-        return p;
     }
 
     @VisibleForTesting
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 552971d7..7ba9e88c 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -201,11 +201,13 @@ public class JobAutoScalerImplTest {
                         new ScalingRealizer<>() {
                             @Override
                             public void realizeConfigOverrides(
-                                    JobAutoScalerContext context, 
ConfigChanges configChanges) {}
+                                    JobAutoScalerContext<JobID> context,
+                                    ConfigChanges configChanges) {}
 
                             @Override
                             public void realizeParallelismOverrides(
-                                    JobAutoScalerContext context, Map 
parallelismOverrides) {
+                                    JobAutoScalerContext<JobID> context,
+                                    Map<String, String> parallelismOverrides) {
                                 throw new RuntimeException(
                                         "Test Realize Parallelism Overrides 
Exceptions.");
                             }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 3d085e17..c8ec9214 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -530,7 +530,7 @@ public class JobVertexScalerTest {
 
     @Test
     public void testMinParallelismLimitIsUsed() {
-        conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+        conf.set(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
         conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
         var delayedScaleDown = new DelayedScaleDown();
 
@@ -560,7 +560,7 @@ public class JobVertexScalerTest {
 
     @Test
     public void testMaxParallelismLimitIsUsed() {
-        conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
+        conf.set(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
         conf.set(UTILIZATION_TARGET, 1.);
         var delayedScaleDown = new DelayedScaleDown();
 
@@ -1015,7 +1015,7 @@ public class JobVertexScalerTest {
                         context));
 
         assertEquals(
-                20,
+                22,
                 JobVertexScaler.scale(
                         vertex,
                         22,

Reply via email to