This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new bc7c5625 [hotfix] Avoid unnecessary scale-down when recommended 
parallelism equals current parallelism in delay window.
bc7c5625 is described below

commit bc7c562514270c41da9a3eff83d968f8122aea06
Author: wangxinglong02 <[email protected]>
AuthorDate: Wed Mar 11 23:57:59 2026 +0800

    [hotfix] Avoid unnecessary scale-down when recommended parallelism equals 
current parallelism in delay window.
    
    When the scale-down interval window selects the max recommended parallelism 
that happens to match the current parallelism, which would cause an 
IllegalArgumentException in ScalingSummary.
---
 .../apache/flink/autoscaler/JobVertexScaler.java   |  65 ++++++---
 .../flink/autoscaler/JobVertexScalerTest.java      | 146 ++++++++++++++-------
 2 files changed, 146 insertions(+), 65 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index dfad27ad..395a873d 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -100,19 +100,21 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     @Getter
     public static class ParallelismChange {
 
-        private static final ParallelismChange NO_CHANGE = new 
ParallelismChange(-1, false);
-
         private final int newParallelism;
 
+        private final int currentParallelism;
+
         private final boolean outsideUtilizationBound;
 
-        private ParallelismChange(int newParallelism, boolean 
outsideUtilizationBound) {
+        private ParallelismChange(
+                int newParallelism, int currentParallelism, boolean 
outsideUtilizationBound) {
             this.newParallelism = newParallelism;
+            this.currentParallelism = currentParallelism;
             this.outsideUtilizationBound = outsideUtilizationBound;
         }
 
         public boolean isNoChange() {
-            return this == NO_CHANGE;
+            return newParallelism == currentParallelism;
         }
 
         @Override
@@ -124,33 +126,48 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 return false;
             }
             ParallelismChange that = (ParallelismChange) o;
+            if (this.isNoChange() && that.isNoChange()) {
+                // When no scaling happens, outsideUtilizationBound is 
irrelevant.
+                return currentParallelism == that.currentParallelism;
+            }
             return newParallelism == that.newParallelism
+                    && currentParallelism == that.currentParallelism
                     && outsideUtilizationBound == that.outsideUtilizationBound;
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(newParallelism, outsideUtilizationBound);
+            if (isNoChange()) {
+                // When no scaling happens, outsideUtilizationBound is 
irrelevant.
+                return Objects.hash(currentParallelism);
+            }
+            return Objects.hash(newParallelism, currentParallelism, 
outsideUtilizationBound);
         }
 
         @Override
         public String toString() {
             return isNoChange()
-                    ? "NoParallelismChange"
+                    ? "NoParallelismChange{currentParallelism=" + 
currentParallelism + "}"
                     : "ParallelismChange{newParallelism="
                             + newParallelism
+                            + ", currentParallelism="
+                            + currentParallelism
                             + ", outsideUtilizationBound="
                             + outsideUtilizationBound
                             + "}";
         }
 
-        public static ParallelismChange build(int newParallelism, boolean 
outsideUtilizationBound) {
+        public static ParallelismChange build(
+                int newParallelism, int currentParallelism, boolean 
outsideUtilizationBound) {
             checkArgument(newParallelism > 0, "The parallelism should be 
greater than 0.");
-            return new ParallelismChange(newParallelism, 
outsideUtilizationBound);
+            checkArgument(currentParallelism > 0, "The parallelism should be 
greater than 0.");
+            return new ParallelismChange(
+                    newParallelism, currentParallelism, 
outsideUtilizationBound);
         }
 
-        public static ParallelismChange noChange() {
-            return NO_CHANGE;
+        public static ParallelismChange noChange(int currentParallelism) {
+            checkArgument(currentParallelism > 0, "The parallelism should be 
greater than 0.");
+            return new ParallelismChange(currentParallelism, 
currentParallelism, false);
         }
     }
 
@@ -169,7 +186,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             LOG.warn(
                     "True processing rate is not available for {}, cannot 
compute new parallelism",
                     vertex);
-            return ParallelismChange.noChange();
+            return ParallelismChange.noChange(currentParallelism);
         }
 
         double targetCapacity =
@@ -179,7 +196,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             LOG.warn(
                     "Target data rate is not available for {}, cannot compute 
new parallelism",
                     vertex);
-            return ParallelismChange.noChange();
+            return ParallelismChange.noChange(currentParallelism);
         }
 
         LOG.debug("Target processing capacity for {} is {}", vertex, 
targetCapacity);
@@ -230,7 +247,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             // Clear delayed scale down request if the new parallelism is 
equal to
             // currentParallelism.
             delayedScaleDown.clearVertex(vertex);
-            return ParallelismChange.noChange();
+            return ParallelismChange.noChange(currentParallelism);
         }
 
         // We record our expectations for this scaling operation
@@ -350,7 +367,8 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
             // If we don't have past scaling actions for this vertex, don't 
block scale up.
             if (history.isEmpty()) {
-                return ParallelismChange.build(newParallelism, 
outsideUtilizationBound);
+                return ParallelismChange.build(
+                        newParallelism, currentParallelism, 
outsideUtilizationBound);
             }
 
             var lastSummary = history.get(history.lastKey());
@@ -359,13 +377,19 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                     && detectIneffectiveScaleUp(
                             context, vertex, conf, evaluatedMetrics, 
lastSummary)) {
                 // Block scale up when last rescale is ineffective.
-                return ParallelismChange.noChange();
+                return ParallelismChange.noChange(currentParallelism);
             }
 
-            return ParallelismChange.build(newParallelism, 
outsideUtilizationBound);
+            return ParallelismChange.build(
+                    newParallelism, currentParallelism, 
outsideUtilizationBound);
         } else {
             return applyScaleDownInterval(
-                    delayedScaleDown, vertex, conf, newParallelism, 
outsideUtilizationBound);
+                    delayedScaleDown,
+                    vertex,
+                    conf,
+                    newParallelism,
+                    currentParallelism,
+                    outsideUtilizationBound);
         }
     }
 
@@ -400,11 +424,13 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             JobVertexID vertex,
             Configuration conf,
             int newParallelism,
+            int currentParallelism,
             boolean outsideUtilizationBound) {
         var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
         if (scaleDownInterval.toMillis() <= 0) {
             // The scale down interval is disable, so don't block scaling.
-            return ParallelismChange.build(newParallelism, 
outsideUtilizationBound);
+            return ParallelismChange.build(
+                    newParallelism, currentParallelism, 
outsideUtilizationBound);
         }
 
         var now = clock.instant();
@@ -422,7 +448,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                         "Try to skip immediate scale down within scale-down 
interval for {}",
                         vertex);
             }
-            return ParallelismChange.noChange();
+            return ParallelismChange.noChange(currentParallelism);
         } else {
             // Using the maximum parallelism within the scale down interval 
window instead of the
             // latest parallelism when scaling down
@@ -430,6 +456,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                     
delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime);
             return ParallelismChange.build(
                     maxRecommendedParallelism.getParallelism(),
+                    currentParallelism,
                     maxRecommendedParallelism.isOutsideUtilizationBound());
         }
     }
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index c8ec9214..22963c46 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -106,7 +106,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.build(5, true),
+                ParallelismChange.build(5, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -118,7 +118,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, .8);
         assertEquals(
-                ParallelismChange.build(8, false),
+                ParallelismChange.build(8, 10, false),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -130,7 +130,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, .8);
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(10),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -142,7 +142,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, .8);
         assertEquals(
-                ParallelismChange.build(8, false),
+                ParallelismChange.build(8, 10, false),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -153,7 +153,7 @@ public class JobVertexScalerTest {
                         delayedScaleDown));
 
         assertEquals(
-                ParallelismChange.build(8, false),
+                ParallelismChange.build(8, 10, false),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -165,7 +165,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, 0.5);
         assertEquals(
-                ParallelismChange.build(10, true),
+                ParallelismChange.build(10, 2, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -177,7 +177,7 @@ public class JobVertexScalerTest {
 
         conf.set(UTILIZATION_TARGET, 0.6);
         assertEquals(
-                ParallelismChange.build(4, true),
+                ParallelismChange.build(4, 2, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -190,7 +190,7 @@ public class JobVertexScalerTest {
         conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
         assertEquals(
-                ParallelismChange.build(5, true),
+                ParallelismChange.build(5, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -202,7 +202,7 @@ public class JobVertexScalerTest {
 
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
         assertEquals(
-                ParallelismChange.build(4, true),
+                ParallelismChange.build(4, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -215,7 +215,7 @@ public class JobVertexScalerTest {
         conf.set(UTILIZATION_TARGET, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
         assertEquals(
-                ParallelismChange.build(15, true),
+                ParallelismChange.build(15, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -227,7 +227,7 @@ public class JobVertexScalerTest {
 
         conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
         assertEquals(
-                ParallelismChange.build(16, true),
+                ParallelismChange.build(16, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -535,7 +535,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.build(5, true),
+                ParallelismChange.build(5, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
@@ -547,7 +547,7 @@ public class JobVertexScalerTest {
 
         // Make sure we respect current parallelism in case it's lower
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(4),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
@@ -565,7 +565,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(10),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
@@ -577,7 +577,7 @@ public class JobVertexScalerTest {
 
         // Make sure we respect current parallelism in case it's higher
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(12),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
@@ -595,7 +595,8 @@ public class JobVertexScalerTest {
 
         var delayedScaleDown = new DelayedScaleDown();
 
-        assertParallelismChange(10, 50, 100, ParallelismChange.build(5, true), 
delayedScaleDown);
+        assertParallelismChange(
+                10, 50, 100, ParallelismChange.build(5, 10, true), 
delayedScaleDown);
     }
 
     @Test
@@ -608,27 +609,27 @@ public class JobVertexScalerTest {
 
         // The scale down never happen when scale down is first triggered.
         vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
-        assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 800, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
 
         // The scale down never happen within scale down interval.
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(10)), 
ZoneId.systemDefault()));
-        assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 900, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
 
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(40)), 
ZoneId.systemDefault()));
-        assertParallelismChange(100, 720, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 720, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
 
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(59)), 
ZoneId.systemDefault()));
-        assertParallelismChange(100, 640, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 640, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
 
         // The parallelism result should be the max recommended parallelism 
within the scale down
         // interval.
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(60)), 
ZoneId.systemDefault()));
         assertParallelismChange(
-                100, 700, 1000, ParallelismChange.build(90, false), 
delayedScaleDown);
+                100, 700, 1000, ParallelismChange.build(90, 100, false), 
delayedScaleDown);
     }
 
     @Test
@@ -641,20 +642,20 @@ public class JobVertexScalerTest {
 
         // The scale down never happen when scale down is first triggered.
         vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
-        assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 800, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
         assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
 
         // The scale down never happen within scale down interval.
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(10)), 
ZoneId.systemDefault()));
-        assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 900, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
         assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
 
         // Allow immediate scale up
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(12)), 
ZoneId.systemDefault()));
         assertParallelismChange(
-                100, 1700, 1000, ParallelismChange.build(170, true), 
delayedScaleDown);
+                100, 1700, 1000, ParallelismChange.build(170, 100, true), 
delayedScaleDown);
         assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
     }
 
@@ -668,19 +669,19 @@ public class JobVertexScalerTest {
 
         // The scale down never happen when scale down is first triggered.
         vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
-        assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 800, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
         assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
 
         // The scale down never happen within scale down interval.
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(10)), 
ZoneId.systemDefault()));
-        assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 900, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
         assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
 
         // The delayed scale down is canceled when new parallelism is same 
with current parallelism.
         vertexScaler.setClock(
                 Clock.fixed(instant.plus(Duration.ofSeconds(12)), 
ZoneId.systemDefault()));
-        assertParallelismChange(100, 1000, 1000, ParallelismChange.noChange(), 
delayedScaleDown);
+        assertParallelismChange(100, 1000, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
         assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
     }
 
@@ -714,7 +715,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.build(10, true),
+                ParallelismChange.build(10, 5, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -729,7 +730,7 @@ public class JobVertexScalerTest {
         // Allow to scale higher if scaling was effective (80%)
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                ParallelismChange.build(20, true),
+                ParallelismChange.build(20, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -745,7 +746,7 @@ public class JobVertexScalerTest {
         // 90 -> 94. Do not try to scale above 20
         evaluated = evaluated(20, 180, 94);
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(20),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -758,7 +759,7 @@ public class JobVertexScalerTest {
         // Still considered ineffective (less than <10%)
         evaluated = evaluated(20, 180, 98);
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(20),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -771,7 +772,7 @@ public class JobVertexScalerTest {
         // Allow scale up if current parallelism doesnt match last (user 
rescaled manually)
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                ParallelismChange.build(20, true),
+                ParallelismChange.build(20, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -784,7 +785,7 @@ public class JobVertexScalerTest {
         // Over 10%, effective
         evaluated = evaluated(20, 180, 100);
         assertEquals(
-                ParallelismChange.build(36, true),
+                ParallelismChange.build(36, 20, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -799,7 +800,7 @@ public class JobVertexScalerTest {
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
false);
         evaluated = evaluated(20, 180, 90);
         assertEquals(
-                ParallelismChange.build(40, true),
+                ParallelismChange.build(40, 20, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -814,7 +815,7 @@ public class JobVertexScalerTest {
         // Allow scale down even if ineffective
         evaluated = evaluated(20, 45, 90);
         assertEquals(
-                ParallelismChange.build(10, true),
+                ParallelismChange.build(10, 20, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -839,7 +840,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
 
         assertEquals(
-                ParallelismChange.build(10, true),
+                ParallelismChange.build(10, 5, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -854,7 +855,7 @@ public class JobVertexScalerTest {
         // Effective scale, no events triggered
         evaluated = evaluated(10, 180, 90);
         assertEquals(
-                ParallelismChange.build(20, true),
+                ParallelismChange.build(20, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -870,7 +871,7 @@ public class JobVertexScalerTest {
         // Ineffective scale, an event is triggered
         evaluated = evaluated(20, 180, 95);
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(20),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -896,7 +897,7 @@ public class JobVertexScalerTest {
                 ScalingMetric.TRUE_PROCESSING_RATE,
                 EvaluatedScalingMetric.avg(tpr.getAverage() + 0.01));
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(20),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -913,7 +914,7 @@ public class JobVertexScalerTest {
         // Repeat ineffective scale with postive interval, no event is 
triggered
         conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, 
Duration.ofSeconds(1800));
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(20),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -927,7 +928,7 @@ public class JobVertexScalerTest {
         // Ineffective scale with interval set to 0, an event is triggered
         conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
         assertEquals(
-                ParallelismChange.noChange(),
+                ParallelismChange.noChange(20),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -949,7 +950,7 @@ public class JobVertexScalerTest {
         // Test ineffective scaling switched off
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
false);
         assertEquals(
-                ParallelismChange.build(40, true),
+                ParallelismChange.build(40, 20, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -1096,7 +1097,7 @@ public class JobVertexScalerTest {
         var delayedScaleDown = new DelayedScaleDown();
         // partition limited
         assertEquals(
-                ParallelismChange.build(15, true),
+                ParallelismChange.build(15, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -1119,7 +1120,7 @@ public class JobVertexScalerTest {
         smallChangesForScaleFactor.put(
                 ScalingMetric.NUM_SOURCE_PARTITIONS, 
EvaluatedScalingMetric.of(15));
         assertEquals(
-                ParallelismChange.build(15, true),
+                ParallelismChange.build(15, 10, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         jobVertexID,
@@ -1332,7 +1333,7 @@ public class JobVertexScalerTest {
                 currentTime, new ScalingSummary(8, 16, 
linearScalingEvaluatedData3));
 
         assertEquals(
-                ParallelismChange.build(10, true),
+                ParallelismChange.build(10, 2, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -1357,7 +1358,7 @@ public class JobVertexScalerTest {
                 currentTime, new ScalingSummary(8, 16, 
diminishingReturnsEvaluatedData3));
 
         assertEquals(
-                ParallelismChange.build(15, true),
+                ParallelismChange.build(15, 2, true),
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
@@ -1367,4 +1368,57 @@ public class JobVertexScalerTest {
                         restartTime,
                         delayedScaleDown));
     }
+
+    @Test
+    public void testMaxRecommendedParallelismUsedAfterScaleDownInterval() {
+        conf.set(UTILIZATION_TARGET, 1.);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+        var instant = Instant.now();
+
+        var delayedScaleDown = new DelayedScaleDown();
+
+        // t=0s: first trigger, newParallelism=50 (100*500/1000), scale down 
delayed.
+        vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
+        assertParallelismChange(100, 500, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
+        assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
+        var delayedInfo = delayedScaleDown.getDelayedVertices().get(vertex);
+        assertEquals(50, 
delayedInfo.getRecommendedParallelisms().getFirst().getParallelism());
+
+        // t=30s: newParallelism=60 (100*600/1000), replaces 50 due to 
monotonic decreasing
+        // property.
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(30)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 600, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
+        assertThat(delayedInfo.getRecommendedParallelisms()).hasSize(1);
+        assertEquals(60, 
delayedInfo.getRecommendedParallelisms().getFirst().getParallelism());
+
+        // t=65s: interval expired (windowStart=5s). 
maxRecommendedParallelism=60 !=
+        // currentParallelism(100) -> build(60, true).
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(65)), 
ZoneId.systemDefault()));
+        assertParallelismChange(
+                100, 600, 1000, ParallelismChange.build(60, 100, true), 
delayedScaleDown);
+    }
+
+    @Test
+    public void 
testMaxRecommendedParallelismEqualsCurrentParallelismReturnsNoChange() {
+        // When maxRecommendedParallelism == currentParallelism after interval 
expires,
+        // NO_CHANGE is returned even if outsideUtilizationBound is true.
+        // Pre-inject recommendation=100 (outsideUtilizationBound=true) at 
t=5s (= windowStart at
+        // t=65s).
+        // At t=65s with currentParallelism=100: newParallelism=80 
(100*800/1000).
+        // recordRecommendedParallelism(80): (5s,100) > 80 survives -> 
[(5s,100),(65s,80)].
+        // getMaxRecommendedParallelism(windowStart=5s): (5s,100) not evicted 
-> peekFirst=100.
+        // maxRecommendedParallelism(100) == currentParallelism(100) -> 
NO_CHANGE.
+        conf.set(UTILIZATION_TARGET, 1.);
+        conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+        var instant = Instant.now();
+
+        var delayedScaleDown = new DelayedScaleDown();
+        delayedScaleDown.triggerScaleDown(vertex, 
instant.plus(Duration.ofSeconds(5)), 100, true);
+
+        vertexScaler.setClock(
+                Clock.fixed(instant.plus(Duration.ofSeconds(65)), 
ZoneId.systemDefault()));
+        assertParallelismChange(100, 800, 1000, 
ParallelismChange.noChange(100), delayedScaleDown);
+    }
 }

Reply via email to