This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 58b8ea8c [FLINK-33993] Fix misleading events for scaling effectiveness 
detection (#748)
58b8ea8c is described below

commit 58b8ea8ca3ce12d6f7c8d0b893a5e2d29c506cfd
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Fri Jan 5 10:58:53 2024 +0100

    [FLINK-33993] Fix misleading events for scaling effectiveness detection 
(#748)
    
    When the ineffective scaling decision feature is turned off, events are
    regenerated which look like this:
    
    ```
    Skipping further scale up after ineffective previous scale up for 
65c763af14a952c064c400d516c25529
    ```
    
    This is misleading because no action will be taken. It is fair to inform 
users
    about ineffective scale up even when the feature is disabled but a different
    message should be printed to convey that no action will be taken.
---
 .../apache/flink/autoscaler/JobVertexScaler.java   | 15 +++++++++++---
 .../flink/autoscaler/JobVertexScalerTest.java      | 23 ++++++++++++++++++++--
 2 files changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 915cd2c4..01f6d940 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -58,7 +58,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
 
     @VisibleForTesting
     protected static final String INEFFECTIVE_MESSAGE_FORMAT =
-            "Skipping further scale up after ineffective previous scale up for 
%s";
+            "Ineffective scaling detected for %s (expected increase: %s, 
actual increase %s). Blocking of ineffective scaling decisions is %s";
 
     private Clock clock = Clock.system(ZoneId.systemDefault());
 
@@ -214,7 +214,16 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             return false;
         }
 
-        var message = String.format(INEFFECTIVE_MESSAGE_FORMAT, vertex);
+        boolean blockIneffectiveScalings =
+                
conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED);
+
+        var message =
+                String.format(
+                        INEFFECTIVE_MESSAGE_FORMAT,
+                        vertex,
+                        expectedIncrease,
+                        actualIncrease,
+                        blockIneffectiveScalings ? "enabled" : "disabled");
 
         autoScalerEventHandler.handleEvent(
                 context,
@@ -224,7 +233,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                 null,
                 conf.get(SCALING_EVENT_INTERVAL));
 
-        if 
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
+        if (blockIneffectiveScalings) {
             LOG.warn(
                     "Ineffective scaling detected for {}, expected increase 
{}, actual {}",
                     vertex,
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 0f9242e7..f87fbebe 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -431,7 +431,9 @@ public class JobVertexScalerTest {
         var event = eventCollector.events.poll();
         assertThat(event).isNotNull();
         assertThat(event.getMessage())
-                .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, 
jobVertexID));
+                .isEqualTo(
+                        String.format(
+                                INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0, 
5.0, "enabled"));
         assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
         assertEquals(1, event.getCount());
 
@@ -463,9 +465,26 @@ public class JobVertexScalerTest {
         event = eventCollector.events.poll();
         assertThat(event).isNotNull();
         assertThat(event.getMessage())
-                .isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, 
jobVertexID));
+                .isEqualTo(
+                        String.format(
+                                INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0, 
5.0, "enabled"));
         assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
         assertEquals(2, event.getCount());
+
+        // Test ineffective scaling switched off
+        conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
false);
+        assertEquals(
+                40,
+                vertexScaler.computeScaleTargetParallelism(
+                        context, jobVertexID, evaluated, history, 
restartTime));
+        assertEquals(1, eventCollector.events.size());
+        event = eventCollector.events.poll();
+        assertThat(event).isNotNull();
+        assertThat(event.getMessage())
+                .isEqualTo(
+                        String.format(
+                                INEFFECTIVE_MESSAGE_FORMAT, jobVertexID, 90.0, 
5.0, "disabled"));
+        assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
     }
 
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(

Reply via email to