This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new bc7c5625 [hotfix] Avoid unnecessary scale-down when recommended
parallelism equals current parallelism in delay window.
bc7c5625 is described below
commit bc7c562514270c41da9a3eff83d968f8122aea06
Author: wangxinglong02 <[email protected]>
AuthorDate: Wed Mar 11 23:57:59 2026 +0800
[hotfix] Avoid unnecessary scale-down when recommended parallelism equals
current parallelism in delay window.
When the scale-down interval window selects the max recommended parallelism
that happens to match the current parallelism, which would cause an
IllegalArgumentException in ScalingSummary.
---
.../apache/flink/autoscaler/JobVertexScaler.java | 65 ++++++---
.../flink/autoscaler/JobVertexScalerTest.java | 146 ++++++++++++++-------
2 files changed, 146 insertions(+), 65 deletions(-)
diff --git
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index dfad27ad..395a873d 100644
---
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -100,19 +100,21 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
@Getter
public static class ParallelismChange {
- private static final ParallelismChange NO_CHANGE = new
ParallelismChange(-1, false);
-
private final int newParallelism;
+ private final int currentParallelism;
+
private final boolean outsideUtilizationBound;
- private ParallelismChange(int newParallelism, boolean
outsideUtilizationBound) {
+ private ParallelismChange(
+ int newParallelism, int currentParallelism, boolean
outsideUtilizationBound) {
this.newParallelism = newParallelism;
+ this.currentParallelism = currentParallelism;
this.outsideUtilizationBound = outsideUtilizationBound;
}
public boolean isNoChange() {
- return this == NO_CHANGE;
+ return newParallelism == currentParallelism;
}
@Override
@@ -124,33 +126,48 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
return false;
}
ParallelismChange that = (ParallelismChange) o;
+ if (this.isNoChange() && that.isNoChange()) {
+ // When no scaling happens, outsideUtilizationBound is
irrelevant.
+ return currentParallelism == that.currentParallelism;
+ }
return newParallelism == that.newParallelism
+ && currentParallelism == that.currentParallelism
&& outsideUtilizationBound == that.outsideUtilizationBound;
}
@Override
public int hashCode() {
- return Objects.hash(newParallelism, outsideUtilizationBound);
+ if (isNoChange()) {
+ // When no scaling happens, outsideUtilizationBound is
irrelevant.
+ return Objects.hash(currentParallelism);
+ }
+ return Objects.hash(newParallelism, currentParallelism,
outsideUtilizationBound);
}
@Override
public String toString() {
return isNoChange()
- ? "NoParallelismChange"
+ ? "NoParallelismChange{currentParallelism=" +
currentParallelism + "}"
: "ParallelismChange{newParallelism="
+ newParallelism
+ + ", currentParallelism="
+ + currentParallelism
+ ", outsideUtilizationBound="
+ outsideUtilizationBound
+ "}";
}
- public static ParallelismChange build(int newParallelism, boolean
outsideUtilizationBound) {
+ public static ParallelismChange build(
+ int newParallelism, int currentParallelism, boolean
outsideUtilizationBound) {
checkArgument(newParallelism > 0, "The parallelism should be
greater than 0.");
- return new ParallelismChange(newParallelism,
outsideUtilizationBound);
+ checkArgument(currentParallelism > 0, "The parallelism should be
greater than 0.");
+ return new ParallelismChange(
+ newParallelism, currentParallelism,
outsideUtilizationBound);
}
- public static ParallelismChange noChange() {
- return NO_CHANGE;
+ public static ParallelismChange noChange(int currentParallelism) {
+ checkArgument(currentParallelism > 0, "The parallelism should be
greater than 0.");
+ return new ParallelismChange(currentParallelism,
currentParallelism, false);
}
}
@@ -169,7 +186,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
LOG.warn(
"True processing rate is not available for {}, cannot
compute new parallelism",
vertex);
- return ParallelismChange.noChange();
+ return ParallelismChange.noChange(currentParallelism);
}
double targetCapacity =
@@ -179,7 +196,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
LOG.warn(
"Target data rate is not available for {}, cannot compute
new parallelism",
vertex);
- return ParallelismChange.noChange();
+ return ParallelismChange.noChange(currentParallelism);
}
LOG.debug("Target processing capacity for {} is {}", vertex,
targetCapacity);
@@ -230,7 +247,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
// Clear delayed scale down request if the new parallelism is
equal to
// currentParallelism.
delayedScaleDown.clearVertex(vertex);
- return ParallelismChange.noChange();
+ return ParallelismChange.noChange(currentParallelism);
}
// We record our expectations for this scaling operation
@@ -350,7 +367,8 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
// If we don't have past scaling actions for this vertex, don't
block scale up.
if (history.isEmpty()) {
- return ParallelismChange.build(newParallelism,
outsideUtilizationBound);
+ return ParallelismChange.build(
+ newParallelism, currentParallelism,
outsideUtilizationBound);
}
var lastSummary = history.get(history.lastKey());
@@ -359,13 +377,19 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
&& detectIneffectiveScaleUp(
context, vertex, conf, evaluatedMetrics,
lastSummary)) {
// Block scale up when last rescale is ineffective.
- return ParallelismChange.noChange();
+ return ParallelismChange.noChange(currentParallelism);
}
- return ParallelismChange.build(newParallelism,
outsideUtilizationBound);
+ return ParallelismChange.build(
+ newParallelism, currentParallelism,
outsideUtilizationBound);
} else {
return applyScaleDownInterval(
- delayedScaleDown, vertex, conf, newParallelism,
outsideUtilizationBound);
+ delayedScaleDown,
+ vertex,
+ conf,
+ newParallelism,
+ currentParallelism,
+ outsideUtilizationBound);
}
}
@@ -400,11 +424,13 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
JobVertexID vertex,
Configuration conf,
int newParallelism,
+ int currentParallelism,
boolean outsideUtilizationBound) {
var scaleDownInterval = conf.get(SCALE_DOWN_INTERVAL);
if (scaleDownInterval.toMillis() <= 0) {
// The scale down interval is disable, so don't block scaling.
- return ParallelismChange.build(newParallelism,
outsideUtilizationBound);
+ return ParallelismChange.build(
+ newParallelism, currentParallelism,
outsideUtilizationBound);
}
var now = clock.instant();
@@ -422,7 +448,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
"Try to skip immediate scale down within scale-down
interval for {}",
vertex);
}
- return ParallelismChange.noChange();
+ return ParallelismChange.noChange(currentParallelism);
} else {
// Using the maximum parallelism within the scale down interval
window instead of the
// latest parallelism when scaling down
@@ -430,6 +456,7 @@ public class JobVertexScaler<KEY, Context extends
JobAutoScalerContext<KEY>> {
delayedScaleDownInfo.getMaxRecommendedParallelism(windowStartTime);
return ParallelismChange.build(
maxRecommendedParallelism.getParallelism(),
+ currentParallelism,
maxRecommendedParallelism.isOutsideUtilizationBound());
}
}
diff --git
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index c8ec9214..22963c46 100644
---
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -106,7 +106,7 @@ public class JobVertexScalerTest {
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.build(5, true),
+ ParallelismChange.build(5, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -118,7 +118,7 @@ public class JobVertexScalerTest {
conf.set(UTILIZATION_TARGET, .8);
assertEquals(
- ParallelismChange.build(8, false),
+ ParallelismChange.build(8, 10, false),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -130,7 +130,7 @@ public class JobVertexScalerTest {
conf.set(UTILIZATION_TARGET, .8);
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(10),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -142,7 +142,7 @@ public class JobVertexScalerTest {
conf.set(UTILIZATION_TARGET, .8);
assertEquals(
- ParallelismChange.build(8, false),
+ ParallelismChange.build(8, 10, false),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -153,7 +153,7 @@ public class JobVertexScalerTest {
delayedScaleDown));
assertEquals(
- ParallelismChange.build(8, false),
+ ParallelismChange.build(8, 10, false),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -165,7 +165,7 @@ public class JobVertexScalerTest {
conf.set(UTILIZATION_TARGET, 0.5);
assertEquals(
- ParallelismChange.build(10, true),
+ ParallelismChange.build(10, 2, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -177,7 +177,7 @@ public class JobVertexScalerTest {
conf.set(UTILIZATION_TARGET, 0.6);
assertEquals(
- ParallelismChange.build(4, true),
+ ParallelismChange.build(4, 2, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -190,7 +190,7 @@ public class JobVertexScalerTest {
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
assertEquals(
- ParallelismChange.build(5, true),
+ ParallelismChange.build(5, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -202,7 +202,7 @@ public class JobVertexScalerTest {
conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
assertEquals(
- ParallelismChange.build(4, true),
+ ParallelismChange.build(4, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -215,7 +215,7 @@ public class JobVertexScalerTest {
conf.set(UTILIZATION_TARGET, 1.);
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
assertEquals(
- ParallelismChange.build(15, true),
+ ParallelismChange.build(15, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -227,7 +227,7 @@ public class JobVertexScalerTest {
conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
assertEquals(
- ParallelismChange.build(16, true),
+ ParallelismChange.build(16, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -535,7 +535,7 @@ public class JobVertexScalerTest {
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.build(5, true),
+ ParallelismChange.build(5, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
@@ -547,7 +547,7 @@ public class JobVertexScalerTest {
// Make sure we respect current parallelism in case it's lower
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(4),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
@@ -565,7 +565,7 @@ public class JobVertexScalerTest {
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(10),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
@@ -577,7 +577,7 @@ public class JobVertexScalerTest {
// Make sure we respect current parallelism in case it's higher
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(12),
vertexScaler.computeScaleTargetParallelism(
context,
new JobVertexID(),
@@ -595,7 +595,8 @@ public class JobVertexScalerTest {
var delayedScaleDown = new DelayedScaleDown();
- assertParallelismChange(10, 50, 100, ParallelismChange.build(5, true),
delayedScaleDown);
+ assertParallelismChange(
+ 10, 50, 100, ParallelismChange.build(5, 10, true),
delayedScaleDown);
}
@Test
@@ -608,27 +609,27 @@ public class JobVertexScalerTest {
// The scale down never happen when scale down is first triggered.
vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
- assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 800, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
// The scale down never happen within scale down interval.
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(10)),
ZoneId.systemDefault()));
- assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 900, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(40)),
ZoneId.systemDefault()));
- assertParallelismChange(100, 720, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 720, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(59)),
ZoneId.systemDefault()));
- assertParallelismChange(100, 640, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 640, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
// The parallelism result should be the max recommended parallelism
within the scale down
// interval.
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(60)),
ZoneId.systemDefault()));
assertParallelismChange(
- 100, 700, 1000, ParallelismChange.build(90, false),
delayedScaleDown);
+ 100, 700, 1000, ParallelismChange.build(90, 100, false),
delayedScaleDown);
}
@Test
@@ -641,20 +642,20 @@ public class JobVertexScalerTest {
// The scale down never happen when scale down is first triggered.
vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
- assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 800, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
// The scale down never happen within scale down interval.
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(10)),
ZoneId.systemDefault()));
- assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 900, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
// Allow immediate scale up
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(12)),
ZoneId.systemDefault()));
assertParallelismChange(
- 100, 1700, 1000, ParallelismChange.build(170, true),
delayedScaleDown);
+ 100, 1700, 1000, ParallelismChange.build(170, 100, true),
delayedScaleDown);
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
}
@@ -668,19 +669,19 @@ public class JobVertexScalerTest {
// The scale down never happen when scale down is first triggered.
vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
- assertParallelismChange(100, 800, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 800, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
// The scale down never happen within scale down interval.
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(10)),
ZoneId.systemDefault()));
- assertParallelismChange(100, 900, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 900, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
// The delayed scale down is canceled when new parallelism is same
with current parallelism.
vertexScaler.setClock(
Clock.fixed(instant.plus(Duration.ofSeconds(12)),
ZoneId.systemDefault()));
- assertParallelismChange(100, 1000, 1000, ParallelismChange.noChange(),
delayedScaleDown);
+ assertParallelismChange(100, 1000, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
assertThat(delayedScaleDown.getDelayedVertices()).isEmpty();
}
@@ -714,7 +715,7 @@ public class JobVertexScalerTest {
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.build(10, true),
+ ParallelismChange.build(10, 5, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -729,7 +730,7 @@ public class JobVertexScalerTest {
// Allow to scale higher if scaling was effective (80%)
evaluated = evaluated(10, 180, 90);
assertEquals(
- ParallelismChange.build(20, true),
+ ParallelismChange.build(20, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -745,7 +746,7 @@ public class JobVertexScalerTest {
// 90 -> 94. Do not try to scale above 20
evaluated = evaluated(20, 180, 94);
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(20),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -758,7 +759,7 @@ public class JobVertexScalerTest {
// Still considered ineffective (less than <10%)
evaluated = evaluated(20, 180, 98);
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(20),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -771,7 +772,7 @@ public class JobVertexScalerTest {
// Allow scale up if current parallelism doesnt match last (user
rescaled manually)
evaluated = evaluated(10, 180, 90);
assertEquals(
- ParallelismChange.build(20, true),
+ ParallelismChange.build(20, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -784,7 +785,7 @@ public class JobVertexScalerTest {
// Over 10%, effective
evaluated = evaluated(20, 180, 100);
assertEquals(
- ParallelismChange.build(36, true),
+ ParallelismChange.build(36, 20, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -799,7 +800,7 @@ public class JobVertexScalerTest {
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
false);
evaluated = evaluated(20, 180, 90);
assertEquals(
- ParallelismChange.build(40, true),
+ ParallelismChange.build(40, 20, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -814,7 +815,7 @@ public class JobVertexScalerTest {
// Allow scale down even if ineffective
evaluated = evaluated(20, 45, 90);
assertEquals(
- ParallelismChange.build(10, true),
+ ParallelismChange.build(10, 20, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -839,7 +840,7 @@ public class JobVertexScalerTest {
var delayedScaleDown = new DelayedScaleDown();
assertEquals(
- ParallelismChange.build(10, true),
+ ParallelismChange.build(10, 5, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -854,7 +855,7 @@ public class JobVertexScalerTest {
// Effective scale, no events triggered
evaluated = evaluated(10, 180, 90);
assertEquals(
- ParallelismChange.build(20, true),
+ ParallelismChange.build(20, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -870,7 +871,7 @@ public class JobVertexScalerTest {
// Ineffective scale, an event is triggered
evaluated = evaluated(20, 180, 95);
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(20),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -896,7 +897,7 @@ public class JobVertexScalerTest {
ScalingMetric.TRUE_PROCESSING_RATE,
EvaluatedScalingMetric.avg(tpr.getAverage() + 0.01));
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(20),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -913,7 +914,7 @@ public class JobVertexScalerTest {
// Repeat ineffective scale with postive interval, no event is
triggered
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL,
Duration.ofSeconds(1800));
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(20),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -927,7 +928,7 @@ public class JobVertexScalerTest {
// Ineffective scale with interval set to 0, an event is triggered
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
assertEquals(
- ParallelismChange.noChange(),
+ ParallelismChange.noChange(20),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -949,7 +950,7 @@ public class JobVertexScalerTest {
// Test ineffective scaling switched off
conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED,
false);
assertEquals(
- ParallelismChange.build(40, true),
+ ParallelismChange.build(40, 20, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -1096,7 +1097,7 @@ public class JobVertexScalerTest {
var delayedScaleDown = new DelayedScaleDown();
// partition limited
assertEquals(
- ParallelismChange.build(15, true),
+ ParallelismChange.build(15, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -1119,7 +1120,7 @@ public class JobVertexScalerTest {
smallChangesForScaleFactor.put(
ScalingMetric.NUM_SOURCE_PARTITIONS,
EvaluatedScalingMetric.of(15));
assertEquals(
- ParallelismChange.build(15, true),
+ ParallelismChange.build(15, 10, true),
vertexScaler.computeScaleTargetParallelism(
context,
jobVertexID,
@@ -1332,7 +1333,7 @@ public class JobVertexScalerTest {
currentTime, new ScalingSummary(8, 16,
linearScalingEvaluatedData3));
assertEquals(
- ParallelismChange.build(10, true),
+ ParallelismChange.build(10, 2, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -1357,7 +1358,7 @@ public class JobVertexScalerTest {
currentTime, new ScalingSummary(8, 16,
diminishingReturnsEvaluatedData3));
assertEquals(
- ParallelismChange.build(15, true),
+ ParallelismChange.build(15, 2, true),
vertexScaler.computeScaleTargetParallelism(
context,
op,
@@ -1367,4 +1368,57 @@ public class JobVertexScalerTest {
restartTime,
delayedScaleDown));
}
+
+ @Test
+ public void testMaxRecommendedParallelismUsedAfterScaleDownInterval() {
+ conf.set(UTILIZATION_TARGET, 1.);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+ var instant = Instant.now();
+
+ var delayedScaleDown = new DelayedScaleDown();
+
+ // t=0s: first trigger, newParallelism=50 (100*500/1000), scale down
delayed.
+ vertexScaler.setClock(Clock.fixed(instant, ZoneId.systemDefault()));
+ assertParallelismChange(100, 500, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
+ assertThat(delayedScaleDown.getDelayedVertices()).isNotEmpty();
+ var delayedInfo = delayedScaleDown.getDelayedVertices().get(vertex);
+ assertEquals(50,
delayedInfo.getRecommendedParallelisms().getFirst().getParallelism());
+
+ // t=30s: newParallelism=60 (100*600/1000), replaces 50 due to
monotonic decreasing
+ // property.
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(30)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 600, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
+ assertThat(delayedInfo.getRecommendedParallelisms()).hasSize(1);
+ assertEquals(60,
delayedInfo.getRecommendedParallelisms().getFirst().getParallelism());
+
+ // t=65s: interval expired (windowStart=5s).
maxRecommendedParallelism=60 !=
+ // currentParallelism(100) -> build(60, true).
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(65)),
ZoneId.systemDefault()));
+ assertParallelismChange(
+ 100, 600, 1000, ParallelismChange.build(60, 100, true),
delayedScaleDown);
+ }
+
+ @Test
+ public void
testMaxRecommendedParallelismEqualsCurrentParallelismReturnsNoChange() {
+ // When maxRecommendedParallelism == currentParallelism after interval
expires,
+ // NO_CHANGE is returned even if outsideUtilizationBound is true.
+ // Pre-inject recommendation=100 (outsideUtilizationBound=true) at
t=5s (= windowStart at
+ // t=65s).
+ // At t=65s with currentParallelism=100: newParallelism=80
(100*800/1000).
+ // recordRecommendedParallelism(80): (5s,100) > 80 survives ->
[(5s,100),(65s,80)].
+ // getMaxRecommendedParallelism(windowStart=5s): (5s,100) not evicted
-> peekFirst=100.
+ // maxRecommendedParallelism(100) == currentParallelism(100) ->
NO_CHANGE.
+ conf.set(UTILIZATION_TARGET, 1.);
+ conf.set(AutoScalerOptions.SCALE_DOWN_INTERVAL, Duration.ofMinutes(1));
+ var instant = Instant.now();
+
+ var delayedScaleDown = new DelayedScaleDown();
+ delayedScaleDown.triggerScaleDown(vertex,
instant.plus(Duration.ofSeconds(5)), 100, true);
+
+ vertexScaler.setClock(
+ Clock.fixed(instant.plus(Duration.ofSeconds(65)),
ZoneId.systemDefault()));
+ assertParallelismChange(100, 800, 1000,
ParallelismChange.noChange(100), delayedScaleDown);
+ }
}