This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 53d22d8a3e33f3971df43a29735471b7a8ccefaf
Author: Matyas Orhidi <matyas_orh...@apple.com>
AuthorDate: Fri Jan 20 16:48:29 2023 -0800

    [FLINK-30653] Trigger resource Events on autoscaler actions
---
 .../operator/autoscaler/JobAutoScaler.java         |   6 +-
 .../operator/autoscaler/JobVertexScaler.java       |  31 ++++-
 .../operator/autoscaler/ScalingExecutor.java       |  69 ++++++++---
 .../kubernetes/operator/listener/AuditUtils.java   |   4 +-
 .../AbstractFlinkResourceReconciler.java           |   2 +-
 .../kubernetes/operator/utils/EventRecorder.java   |   4 +-
 .../autoscaler/BacklogBasedScalingTest.java        |   7 +-
 .../operator/autoscaler/JobVertexScalerTest.java   | 133 +++++++++++++++++----
 .../MetricsCollectionAndEvaluationTest.java        |   7 +-
 .../operator/autoscaler/ScalingExecutorTest.java   |  47 +++++++-
 .../kubernetes/operator/utils/EventCollector.java  |   6 +-
 11 files changed, 260 insertions(+), 56 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
index d4d7f285..4f74a528 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingM
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import 
org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -172,11 +173,12 @@ public class JobAutoScaler implements Cleanup {
                         });
     }
 
-    public static JobAutoScaler create(KubernetesClient kubernetesClient) {
+    public static JobAutoScaler create(
+            KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
         return new JobAutoScaler(
                 kubernetesClient,
                 new RestApiMetricsCollector(),
                 new ScalingMetricEvaluator(),
-                new ScalingExecutor(kubernetesClient));
+                new ScalingExecutor(kubernetesClient, eventRecorder));
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index ea019080..31a0ed8d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -19,10 +19,12 @@ package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
@@ -50,9 +52,20 @@ public class JobVertexScaler {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JobVertexScaler.class);
 
+    @VisibleForTesting
+    public static final String INNEFFECTIVE_MESSAGE_FORMAT =
+            "Skipping further scale up after ineffective previous scale up for 
%s";
+
     private Clock clock = Clock.system(ZoneId.systemDefault());
 
+    private EventRecorder eventRecorder;
+
+    public JobVertexScaler(EventRecorder eventRecorder) {
+        this.eventRecorder = eventRecorder;
+    }
+
     public int computeScaleTargetParallelism(
+            AbstractFlinkResource<?, ?> resource,
             Configuration conf,
             JobVertexID vertex,
             Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
@@ -99,6 +112,7 @@ public class JobVertexScaler {
 
         if (newParallelism == currentParallelism
                 || blockScalingBasedOnPastActions(
+                        resource,
                         vertex,
                         conf,
                         evaluatedMetrics,
@@ -115,6 +129,7 @@ public class JobVertexScaler {
     }
 
     private boolean blockScalingBasedOnPastActions(
+            AbstractFlinkResource<?, ?> resource,
             JobVertexID vertex,
             Configuration conf,
             Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
@@ -133,7 +148,8 @@ public class JobVertexScaler {
 
         if (currentParallelism == lastSummary.getNewParallelism() && 
lastSummary.isScaledUp()) {
             if (scaledUp) {
-                return detectIneffectiveScaleUp(vertex, conf, 
evaluatedMetrics, lastSummary);
+                return detectIneffectiveScaleUp(
+                        resource, vertex, conf, evaluatedMetrics, lastSummary);
             } else {
                 return detectImmediateScaleDownAfterScaleUp(vertex, conf, 
lastScalingTs);
             }
@@ -156,6 +172,7 @@ public class JobVertexScaler {
     }
 
     private boolean detectIneffectiveScaleUp(
+            AbstractFlinkResource<?, ?> resource,
             JobVertexID vertex,
             Configuration conf,
             Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
@@ -180,11 +197,17 @@ public class JobVertexScaler {
             return false;
         }
 
-        // TODO: Trigger kube event
+        var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex);
+
+        eventRecorder.triggerEvent(
+                resource,
+                EventRecorder.Type.Normal,
+                EventRecorder.Reason.IneffectiveScaling,
+                EventRecorder.Component.Operator,
+                message);
 
         if 
(conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
-            LOG.info(
-                    "Skipping further scale up after ineffective previous 
scale up for {}", vertex);
+            LOG.info(message);
             return true;
         } else {
             return false;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index 98af3cd1..bda9dfe9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
@@ -60,20 +61,29 @@ public class ScalingExecutor implements Cleanup {
                             "A parallelism override map (jobVertexId -> 
parallelism) which will be used to update"
                                     + " the parallelism of the corresponding 
job vertices of submitted JobGraphs.");
 
+    public static final String SCALING_SUMMARY_ENTRY =
+            " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f 
-> %.2f | Target data rate %.2f";
+    public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED =
+            "Recommended parallelism change:";
+    public static final String SCALING_SUMMARY_HEADER_SCALING_ENABLED = 
"Scaling vertices:";
     private static final Logger LOG = 
LoggerFactory.getLogger(ScalingExecutor.class);
 
     private final KubernetesClient kubernetesClient;
     private final JobVertexScaler jobVertexScaler;
-
+    private final EventRecorder eventRecorder;
     private Clock clock = Clock.system(ZoneId.systemDefault());
 
-    public ScalingExecutor(KubernetesClient kubernetesClient) {
-        this(kubernetesClient, new JobVertexScaler());
+    public ScalingExecutor(KubernetesClient kubernetesClient, EventRecorder 
eventRecorder) {
+        this(kubernetesClient, new JobVertexScaler(eventRecorder), 
eventRecorder);
     }
 
-    public ScalingExecutor(KubernetesClient kubernetesClient, JobVertexScaler 
jobVertexScaler) {
+    public ScalingExecutor(
+            KubernetesClient kubernetesClient,
+            JobVertexScaler jobVertexScaler,
+            EventRecorder eventRecorder) {
         this.kubernetesClient = kubernetesClient;
         this.jobVertexScaler = jobVertexScaler;
+        this.eventRecorder = eventRecorder;
     }
 
     public boolean scaleResource(
@@ -87,7 +97,9 @@ public class ScalingExecutor implements Cleanup {
         }
 
         var scalingHistory = scalingInformation.getScalingHistory();
-        var scalingSummaries = computeScalingSummary(conf, evaluatedMetrics, 
scalingHistory);
+        var scalingSummaries =
+                computeScalingSummary(resource, conf, evaluatedMetrics, 
scalingHistory);
+
         if (scalingSummaries.isEmpty()) {
             LOG.info("All job vertices are currently running at their target 
parallelism.");
             return false;
@@ -97,22 +109,20 @@ public class ScalingExecutor implements Cleanup {
             return false;
         }
 
-        if (!conf.get(SCALING_ENABLED)) {
+        var scalingEnabled = conf.get(SCALING_ENABLED);
+
+        var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
+        eventRecorder.triggerEvent(
+                resource,
+                EventRecorder.Type.Normal,
+                EventRecorder.Reason.ScalingReport,
+                EventRecorder.Component.Operator,
+                scalingReport);
+
+        if (!scalingEnabled) {
             return false;
         }
 
-        LOG.info("Scaling vertices:");
-        scalingSummaries.forEach(
-                (v, s) ->
-                        LOG.info(
-                                "{} | Parallelism {} -> {} | Processing 
capacity {} -> {} | Target data rate {}",
-                                v,
-                                s.getCurrentParallelism(),
-                                s.getNewParallelism(),
-                                
s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
-                                
s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
-                                
s.getMetrics().get(TARGET_DATA_RATE).getAverage()));
-
         setVertexParallelismOverrides(resource, evaluatedMetrics, 
scalingSummaries);
         KubernetesClientUtils.applyToStoredCr(
                 kubernetesClient,
@@ -126,6 +136,27 @@ public class ScalingExecutor implements Cleanup {
         return true;
     }
 
+    private static String scalingReport(
+            Map<JobVertexID, ScalingSummary> scalingSummaries, boolean 
scalingEnabled) {
+        StringBuilder sb =
+                new StringBuilder(
+                        scalingEnabled
+                                ? SCALING_SUMMARY_HEADER_SCALING_ENABLED
+                                : SCALING_SUMMARY_HEADER_SCALING_DISABLED);
+        scalingSummaries.forEach(
+                (v, s) ->
+                        sb.append(
+                                String.format(
+                                        SCALING_SUMMARY_ENTRY,
+                                        v,
+                                        s.getCurrentParallelism(),
+                                        s.getNewParallelism(),
+                                        
s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
+                                        
s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
+                                        
s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
+        return sb.toString();
+    }
+
     private boolean stabilizationPeriodPassed(
             AbstractFlinkResource<?, ?> resource, Configuration conf) {
         var jobStatus = resource.getStatus().getJobStatus();
@@ -185,6 +216,7 @@ public class ScalingExecutor implements Cleanup {
     }
 
     private Map<JobVertexID, ScalingSummary> computeScalingSummary(
+            AbstractFlinkResource<?, ?> resource,
             Configuration conf,
             Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> 
evaluatedMetrics,
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory) {
@@ -196,6 +228,7 @@ public class ScalingExecutor implements Cleanup {
                             (int) 
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
                     var newParallelism =
                             jobVertexScaler.computeScaleTargetParallelism(
+                                    resource,
                                     conf,
                                     v,
                                     metrics,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
index a0a85254..e590580e 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.listener;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener;
 import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
@@ -52,7 +53,8 @@ public class AuditUtils {
                         : status.getError());
     }
 
-    private static String format(@NonNull Event event) {
+    @VisibleForTesting
+    public static String format(@NonNull Event event) {
         return String.format(
                 ">>> Event  | %-7s | %-15s | %s",
                 event.getType().equals("Normal") ? "Info" : event.getType(),
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 249d2f80..79e5e72d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -90,7 +90,7 @@ public abstract class AbstractFlinkResourceReconciler<
         this.kubernetesClient = kubernetesClient;
         this.eventRecorder = eventRecorder;
         this.statusRecorder = statusRecorder;
-        this.resourceScaler = JobAutoScaler.create(kubernetesClient);
+        this.resourceScaler = JobAutoScaler.create(kubernetesClient, 
eventRecorder);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index 81baf9b6..d3a3a51c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -128,6 +128,8 @@ public class EventRecorder {
         Missing,
         ValidationError,
         RecoverDeployment,
-        RestartUnhealthyJob
+        RestartUnhealthyJob,
+        ScalingReport,
+        IneffectiveScaling
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index af8662b6..b0e48dd3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 
@@ -69,7 +71,10 @@ public class BacklogBasedScalingTest extends 
OperatorTestBase {
     @BeforeEach
     public void setup() {
         evaluator = new ScalingMetricEvaluator();
-        scalingExecutor = new ScalingExecutor(kubernetesClient);
+        scalingExecutor =
+                new ScalingExecutor(
+                        kubernetesClient,
+                        new EventRecorder(kubernetesClient, new 
EventCollector()));
 
         app = TestUtils.buildApplicationCluster();
         app.getMetadata().setGeneration(1L);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index 34c2b8cc..6e933d1c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -18,11 +18,17 @@
 package org.apache.flink.kubernetes.operator.autoscaler;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.Assert;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -35,19 +41,29 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
 
+import static 
org.apache.flink.kubernetes.operator.autoscaler.JobVertexScaler.INNEFFECTIVE_MESSAGE_FORMAT;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for vertex parallelism scaler logic. */
+@EnableKubernetesMockClient(crud = true)
 public class JobVertexScalerTest {
 
     private JobVertexScaler vertexScaler;
     private Configuration conf;
 
+    private KubernetesClient kubernetesClient;
+    private EventCollector eventCollector;
+
+    private FlinkDeployment flinkDep;
+
     @BeforeEach
     public void setup() {
-        vertexScaler = new JobVertexScaler();
+        flinkDep = TestUtils.buildApplicationCluster();
+        kubernetesClient.resource(flinkDep).createOrReplace();
+        eventCollector = new EventCollector();
+        vertexScaler = new JobVertexScaler(new EventRecorder(kubernetesClient, 
eventCollector));
         conf = new Configuration();
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
@@ -60,55 +76,55 @@ public class JobVertexScalerTest {
         assertEquals(
                 5,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 50, 100), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(10, 50, 100), 
Collections.emptySortedMap()));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
         assertEquals(
                 8,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 50, 100), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(10, 50, 100), 
Collections.emptySortedMap()));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 80, 100), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(10, 80, 100), 
Collections.emptySortedMap()));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
         assertEquals(
                 8,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 60, 100), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(10, 60, 100), 
Collections.emptySortedMap()));
 
         assertEquals(
                 8,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 59, 100), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(10, 59, 100), 
Collections.emptySortedMap()));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5);
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(2, 100, 40), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(2, 100, 40), 
Collections.emptySortedMap()));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6);
         assertEquals(
                 4,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(2, 100, 100), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(2, 100, 100), 
Collections.emptySortedMap()));
 
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5);
         assertEquals(
                 5,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 10, 100), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(10, 10, 100), 
Collections.emptySortedMap()));
 
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6);
         assertEquals(
                 4,
                 vertexScaler.computeScaleTargetParallelism(
-                        conf, op, evaluated(10, 10, 100), 
Collections.emptySortedMap()));
+                        flinkDep, conf, op, evaluated(10, 10, 100), 
Collections.emptySortedMap()));
     }
 
     @Test
@@ -152,6 +168,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 5,
                 vertexScaler.computeScaleTargetParallelism(
+                        flinkDep,
                         conf,
                         new JobVertexID(),
                         evaluated(10, 100, 500),
@@ -165,6 +182,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
+                        flinkDep,
                         conf,
                         new JobVertexID(),
                         evaluated(10, 500, 100),
@@ -181,24 +199,32 @@ public class JobVertexScalerTest {
 
         var evaluated = evaluated(5, 100, 50);
         var history = new TreeMap<Instant, ScalingSummary>();
-        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
 
         history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
 
         // Should not allow scale back down immediately
         evaluated = evaluated(10, 50, 100);
-        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
 
         // Pass some time...
         clock = Clock.offset(Clock.systemDefaultZone(), 
Duration.ofSeconds(61));
         vertexScaler.setClock(clock);
 
-        assertEquals(5, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                5,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         history.put(clock.instant(), new ScalingSummary(10, 5, evaluated));
 
         // Allow immediate scale up
         evaluated = evaluated(5, 100, 50);
-        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
     }
 
@@ -210,58 +236,115 @@ public class JobVertexScalerTest {
 
         var evaluated = evaluated(5, 100, 50);
         var history = new TreeMap<Instant, ScalingSummary>();
-        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         assertEquals(100, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
 
         // Allow to scale higher if scaling was effective (80%)
         evaluated = evaluated(10, 180, 90);
-        assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                20,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         assertEquals(180, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
 
         // Detect ineffective scaling, less than 5% of target increase 
(instead of 90 -> 180, only
         // 90 -> 94. Do not try to scale above 20
         evaluated = evaluated(20, 180, 94);
-        assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                20,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
 
         // Still considered ineffective (less than <10%)
         evaluated = evaluated(20, 180, 98);
-        assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                20,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
 
         // Allow scale up if current parallelism doesnt match last (user 
rescaled manually)
         evaluated = evaluated(10, 180, 90);
-        assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                20,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
 
         // Over 10%, effective
         evaluated = evaluated(20, 180, 100);
-        assertEquals(36, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                36,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
 
         // Ineffective but detection is turned off
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
false);
         evaluated = evaluated(20, 180, 90);
-        assertEquals(40, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                40,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
true);
 
         // Allow scale down even if ineffective
         evaluated = evaluated(20, 45, 90);
-        assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, 
evaluated, history));
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, 
evaluated, history));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
     }
 
+    @Test
+    public void testSendingIneffectiveScalingEvents() {
+        var jobVertexID = new JobVertexID();
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
+        conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
+
+        var evaluated = evaluated(5, 100, 50);
+        var history = new TreeMap<Instant, ScalingSummary>();
+        assertEquals(
+                10,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep, conf, jobVertexID, evaluated, history));
+        assertEquals(100, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
+        history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
+
+        // Effective scale, no events triggered
+        evaluated = evaluated(10, 180, 90);
+        assertEquals(
+                20,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep, conf, jobVertexID, evaluated, history));
+        assertEquals(180, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
+        history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
+        assertEquals(0, eventCollector.events.size());
+
+        // Ineffective scale, an event is triggered
+        evaluated = evaluated(20, 180, 95);
+        assertEquals(
+                20,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep, conf, jobVertexID, evaluated, history));
+        
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
+        assertEquals(1, eventCollector.events.size());
+        var event = eventCollector.events.poll();
+        assertEquals(String.format(INNEFFECTIVE_MESSAGE_FORMAT, jobVertexID), 
event.getMessage());
+        assertEquals(EventRecorder.Reason.IneffectiveScaling.name(), 
event.getReason());
+    }
+
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
-            int parallelism, double target, double procRate) {
+            int parallelism, double targetDataRate, double trueProcessingRate) 
{
         var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
         metrics.put(ScalingMetric.PARALLELISM, 
EvaluatedScalingMetric.of(parallelism));
         metrics.put(ScalingMetric.MAX_PARALLELISM, 
EvaluatedScalingMetric.of(720));
-        metrics.put(ScalingMetric.TARGET_DATA_RATE, new 
EvaluatedScalingMetric(target, target));
+        metrics.put(
+                ScalingMetric.TARGET_DATA_RATE,
+                new EvaluatedScalingMetric(targetDataRate, targetDataRate));
         metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, 
EvaluatedScalingMetric.of(0.));
         metrics.put(
-                ScalingMetric.TRUE_PROCESSING_RATE, new 
EvaluatedScalingMetric(procRate, procRate));
+                ScalingMetric.TRUE_PROCESSING_RATE,
+                new EvaluatedScalingMetric(trueProcessingRate, 
trueProcessingRate));
         ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf);
         return metrics;
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index d12e6bc3..b42137dd 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology;
 import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
@@ -81,7 +83,10 @@ public class MetricsCollectionAndEvaluationTest {
     @BeforeEach
     public void setup() {
         evaluator = new ScalingMetricEvaluator();
-        scalingExecutor = new ScalingExecutor(kubernetesClient);
+        scalingExecutor =
+                new ScalingExecutor(
+                        kubernetesClient,
+                        new EventRecorder(kubernetesClient, new 
EventCollector()));
         service = new TestingFlinkService();
 
         app = TestUtils.buildApplicationCluster();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 7f9fd458..447c262d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -25,12 +25,16 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
 import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.kubernetes.operator.utils.EventCollector;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Clock;
 import java.time.Duration;
@@ -40,6 +44,10 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static 
org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.PARALLELISM_OVERRIDES;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_ENTRY;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_DISABLED;
+import static 
org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_ENABLED;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -49,13 +57,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 public class ScalingExecutorTest {
 
     private ScalingExecutor scalingDecisionExecutor;
+
+    private EventCollector eventCollector;
     private Configuration conf;
     private KubernetesClient kubernetesClient;
     private FlinkDeployment flinkDep;
 
     @BeforeEach
     public void setup() {
-        scalingDecisionExecutor = new ScalingExecutor(kubernetesClient);
+        eventCollector = new EventCollector();
+        scalingDecisionExecutor =
+                new ScalingExecutor(
+                        kubernetesClient, new EventRecorder(kubernetesClient, 
eventCollector));
         conf = new Configuration();
         conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
         conf.set(AutoScalerOptions.SCALING_ENABLED, true);
@@ -170,6 +183,38 @@ public class ScalingExecutorTest {
         
assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, 
scalingSummary));
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testScalingEvents(boolean scalingEnabled) {
+        var jobVertexID = new JobVertexID();
+        conf.set(AutoScalerOptions.SCALING_ENABLED, scalingEnabled);
+        var metrics = Map.of(jobVertexID, evaluated(1, 110, 100));
+        var scalingInfo = new AutoScalerInfo(new HashMap<>());
+        assertEquals(
+                scalingEnabled,
+                scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, 
conf, metrics));
+        assertEquals(1, eventCollector.events.size());
+        var event = eventCollector.events.poll();
+        assertTrue(
+                event.getMessage()
+                        .contains(
+                                String.format(
+                                        SCALING_SUMMARY_ENTRY,
+                                        jobVertexID,
+                                        1,
+                                        2,
+                                        100.0,
+                                        157.0,
+                                        110.0)));
+        assertTrue(
+                event.getMessage()
+                        .contains(
+                                scalingEnabled
+                                        ? 
SCALING_SUMMARY_HEADER_SCALING_ENABLED
+                                        : 
SCALING_SUMMARY_HEADER_SCALING_DISABLED));
+        assertEquals(EventRecorder.Reason.ScalingReport.name(), 
event.getReason());
+    }
+
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
             int parallelism, double target, double procRate, double 
catchupRate) {
         var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java
index 76791dcf..762b0376 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java
@@ -19,8 +19,11 @@
 package org.apache.flink.kubernetes.operator.utils;
 
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
+import org.apache.flink.kubernetes.operator.listener.AuditUtils;
 
 import io.fabric8.kubernetes.api.model.Event;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.LinkedList;
 import java.util.Queue;
@@ -28,11 +31,12 @@ import java.util.function.BiConsumer;
 
 /** Simple consumer that collects triggered events for tests. */
 public class EventCollector implements BiConsumer<AbstractFlinkResource<?, ?>, 
Event> {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(EventCollector.class);
     public final Queue<Event> events = new LinkedList<>();
 
     @Override
     public void accept(AbstractFlinkResource<?, ?> abstractFlinkResource, 
Event event) {
+        LOG.info(AuditUtils.format(event));
         events.add(event);
     }
 }


Reply via email to