This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 14f04033 [FLINK-39299][flink-autoscaler] Inconsistent vertex
parallelism alignment logic
14f04033 is described below
commit 14f04033fc18fbde91bca14b6d38bbdee4ad3722
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Thu Apr 2 16:15:06 2026 +0300
[FLINK-39299][flink-autoscaler] Inconsistent vertex parallelism alignment
logic
---
.../apache/flink/autoscaler/JobVertexScaler.java | 115 +++++++++++++++++----
.../flink/autoscaler/JobAutoScalerImplTest.java | 6 +-
.../flink/autoscaler/JobVertexScalerTest.java | 6 +-
3 files changed, 103 insertions(+), 24 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 492615f4..dfad27ad 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -221,8 +221,8 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
(int)
evaluatedMetrics.get(NUM_SOURCE_PARTITIONS).getCurrent(),
(int)
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
scaleFactor,
- Math.min(currentParallelism,
conf.getInteger(VERTEX_MIN_PARALLELISM)),
- Math.max(currentParallelism,
conf.getInteger(VERTEX_MAX_PARALLELISM)),
+ Math.min(currentParallelism,
conf.get(VERTEX_MIN_PARALLELISM)),
+ Math.max(currentParallelism,
conf.get(VERTEX_MAX_PARALLELISM)),
autoScalerEventHandler,
context);
@@ -483,16 +483,28 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
}
/**
- * Computing the newParallelism. In general, newParallelism =
currentParallelism * scaleFactor.
- * But we limit newParallelism between parallelismLowerLimit and
min(parallelismUpperLimit,
- * maxParallelism).
+ * Computes the new parallelism for a vertex. In general, {@code
newParallelism =
+ * currentParallelism * scaleFactor}, clamped to {@code
[parallelismLowerLimit,
+ * min(parallelismUpperLimit, maxParallelism)]}.
*
- * <p>Also, in order to ensure the data is evenly spread across subtasks,
we try to adjust the
- * parallelism for source and keyed vertex such that it divides the
maxParallelism without a
- * remainder.
+ * <p>For source vertices (with known partition counts) and keyed vertices
(with HASH inputs),
+ * the parallelism is further adjusted to align with the number of key
groups or source
+ * partitions, according to the configured {@link
KeyGroupOrPartitionsAdjustMode}.
*
- * <p>This method also attempts to adjust the parallelism to ensure it
aligns well with the
- * number of source partitions if a vertex has a known source partition
count.
+ * <p>The alignment uses a two-phase algorithm:
+ *
+ * <ol>
+ * <li><b>Phase 1 (upward search):</b> searches from the computed {@code
newParallelism}
+ * upward for divisor-aligned values. For scale-down, the search is
capped at {@code
+ * currentParallelism} to prevent the alignment from crossing into
the scale-up direction.
+ * If the nearest divisor is {@code currentParallelism} itself, it
is returned
+ * (effectively blocking the scale-down).
+ * <li><b>Phase 2 (downward fallback):</b> if Phase 1 finds no match,
searches downward from
+ * {@code newParallelism} for the boundary where per-subtask load
increases. A
+ * direction-safety guard ensures the result never inverts the
intended scaling direction
+ * (scale-up results stay above {@code currentParallelism},
scale-down results stay
+ * below).
+ * </ol>
*/
@VisibleForTesting
protected static <KEY, Context extends JobAutoScalerContext<KEY>> int
scale(
@@ -548,10 +560,23 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
KeyGroupOrPartitionsAdjustMode mode =
context.getConfiguration().get(SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE);
- // When the shuffle type of vertex inputs contains keyBy or vertex is
a source,
- // we try to adjust the parallelism such that it divides
- // the numKeyGroupsOrPartitions without a remainder => data is evenly
spread across subtasks
- for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
+ if (newParallelism == currentParallelism) {
+ return newParallelism;
+ }
+
+ boolean isScaleUp = newParallelism > currentParallelism;
+
+ // Phase 1: Upward search from newParallelism for divisor or
approximate match.
+ // For scale-down, cap search at currentParallelism to prevent the
alignment
+ // from crossing into the scale-up direction (direction-safety fix).
+ // Returning currentParallelism itself is allowed — it effectively
blocks the
+ // scale-down since no aligned value was found strictly below it.
+ int phase1UpperBound =
+ isScaleUp
+ ? upperBoundForAlignment
+ : Math.min(currentParallelism, upperBoundForAlignment);
+
+ for (int p = newParallelism; p <= phase1UpperBound; p++) {
if (numKeyGroupsOrPartitions % p == 0
||
// When Mode is MAXIMIZE_UTILISATION , Try to find the
smallest parallelism
@@ -575,14 +600,67 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
break;
}
}
-
p = Math.max(p, parallelismLowerLimit);
+
+ // Direction-safety guard: prevent the alignment from inverting the
scaling direction.
+ if ((isScaleUp && p <= currentParallelism) || (!isScaleUp && p >=
currentParallelism)) {
+ LOG.warn(
+ "Scaling of {} blocked: alignment could not find a value
that preserves the "
+ + "{} direction from current parallelism {} with
mode {}. "
+ + "Keeping current parallelism.",
+ vertex,
+ isScaleUp ? "scale-up" : "scale-down",
+ currentParallelism,
+ mode);
+ emitScalingLimitedEvent(
+ vertex,
+ newParallelism,
+ currentParallelism,
+ numKeyGroupsOrPartitions,
+ upperBound,
+ parallelismLowerLimit,
+ eventHandler,
+ context);
+ return currentParallelism;
+ }
+
+ if (p != newParallelism) {
+ emitScalingLimitedEvent(
+ vertex,
+ newParallelism,
+ p,
+ numKeyGroupsOrPartitions,
+ upperBound,
+ parallelismLowerLimit,
+ eventHandler,
+ context);
+ }
+ return p;
+ }
+
+ /**
+ * Emits a {@link #SCALING_LIMITED} warning event when the alignment logic
had to deviate from
+ * the computed target parallelism, or when no aligned parallelism could
be found and the
+ * scaling operation was blocked.
+ *
+ * @param expectedParallelism the originally computed target parallelism
+ * @param actualParallelism the parallelism that will actually be applied
+ */
+ private static <KEY, Context extends JobAutoScalerContext<KEY>> void
emitScalingLimitedEvent(
+ JobVertexID vertex,
+ int expectedParallelism,
+ int actualParallelism,
+ int numKeyGroupsOrPartitions,
+ int upperBound,
+ int parallelismLowerLimit,
+ AutoScalerEventHandler<KEY, Context> eventHandler,
+ Context context) {
var message =
String.format(
SCALE_LIMITED_MESSAGE_FORMAT,
vertex,
- newParallelism,
- p,
+ expectedParallelism,
+ actualParallelism,
numKeyGroupsOrPartitions,
upperBound,
parallelismLowerLimit);
@@ -591,9 +669,8 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
AutoScalerEventHandler.Type.Warning,
SCALING_LIMITED,
message,
- SCALING_LIMITED + vertex + newParallelism,
+ SCALING_LIMITED + vertex,
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
- return p;
}
@VisibleForTesting
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
index 552971d7..7ba9e88c 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobAutoScalerImplTest.java
@@ -201,11 +201,13 @@ public class JobAutoScalerImplTest {
new ScalingRealizer<>() {
@Override
public void realizeConfigOverrides(
- JobAutoScalerContext context,
ConfigChanges configChanges) {}
+ JobAutoScalerContext<JobID> context,
+ ConfigChanges configChanges) {}
@Override
public void realizeParallelismOverrides(
- JobAutoScalerContext context, Map
parallelismOverrides) {
+ JobAutoScalerContext<JobID> context,
+ Map<String, String> parallelismOverrides) {
throw new RuntimeException(
"Test Realize Parallelism Overrides
Exceptions.");
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 3d085e17..c8ec9214 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -530,7 +530,7 @@ public class JobVertexScalerTest {
@Test
public void testMinParallelismLimitIsUsed() {
- conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
+ conf.set(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ZERO);
var delayedScaleDown = new DelayedScaleDown();
@@ -560,7 +560,7 @@ public class JobVertexScalerTest {
@Test
public void testMaxParallelismLimitIsUsed() {
- conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
+ conf.set(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
conf.set(UTILIZATION_TARGET, 1.);
var delayedScaleDown = new DelayedScaleDown();
@@ -1015,7 +1015,7 @@ public class JobVertexScalerTest {
context));
assertEquals(
- 20,
+ 22,
JobVertexScaler.scale(
vertex,
22,