This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 58b8ea8c [FLINK-33993] Fix misleading events for scaling effectiveness detection (#748) 58b8ea8c is described below commit 58b8ea8ca3ce12d6f7c8d0b893a5e2d29c506cfd Author: Maximilian Michels <m...@apache.org> AuthorDate: Fri Jan 5 10:58:53 2024 +0100 [FLINK-33993] Fix misleading events for scaling effectiveness detection (#748) When the ineffective scaling decision feature is turned off, events are regenerated which look like this: ``` Skipping further scale up after ineffective previous scale up for 65c763af14a952c064c400d516c25529 ``` This is misleading because no action will be taken. It is fair to inform users about ineffective scale up even when the feature is disabled but a different message should be printed to convey that no action will be taken. --- .../apache/flink/autoscaler/JobVertexScaler.java | 15 +++++++++++--- .../flink/autoscaler/JobVertexScalerTest.java | 23 ++++++++++++++++++++-- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java index 915cd2c4..01f6d940 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java @@ -58,7 +58,7 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { @VisibleForTesting protected static final String INEFFECTIVE_MESSAGE_FORMAT = - "Skipping further scale up after ineffective previous scale up for %s"; + "Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s"; private Clock clock = Clock.system(ZoneId.systemDefault()); @@ -214,7 +214,16 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { return false; } - var message = String.format(INEFFECTIVE_MESSAGE_FORMAT, vertex); + boolean blockIneffectiveScalings = + conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED); + + var message = + String.format( + INEFFECTIVE_MESSAGE_FORMAT, + vertex, + expectedIncrease, + actualIncrease, + blockIneffectiveScalings ? "enabled" : "disabled"); autoScalerEventHandler.handleEvent( context, @@ -224,7 +233,7 @@ public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> { null, conf.get(SCALING_EVENT_INTERVAL)); - if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { + if (blockIneffectiveScalings) { LOG.warn( "Ineffective scaling detected for {}, expected increase {}, actual {}", vertex, diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java index 0f9242e7..f87fbebe 100644 --- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java @@ -431,7 +431,9 @@ public class JobVertexScalerTest { var event = eventCollector.events.poll(); assertThat(event).isNotNull(); assertThat(event.getMessage()) - .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID)); + .isEqualTo( + String.format( + INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0, 5.0, "enabled")); assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING); assertEquals(1, event.getCount()); @@ -463,9 +465,26 @@ public class JobVertexScalerTest { event = eventCollector.events.poll(); assertThat(event).isNotNull(); assertThat(event.getMessage()) - .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID)); + .isEqualTo( + String.format( + INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0, 5.0, "enabled")); assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING); assertEquals(2, event.getCount()); + + // Test ineffective scaling switched off + conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); + assertEquals( + 40, + vertexScaler.computeScaleTargetParallelism( + context, jobVertexID, evaluated, history, restartTime)); + assertEquals(1, eventCollector.events.size()); + event = eventCollector.events.poll(); + assertThat(event).isNotNull(); + assertThat(event.getMessage()) + .isEqualTo( + String.format( + INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0, 5.0, "disabled")); + assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING); } private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(