This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 57974b61 [FLINK-32147] Deduplicate scaling report messages
57974b61 is described below

commit 57974b61590e1976785fbdd4a40dafa975fc9521
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri May 19 16:42:03 2023 +0200

    [FLINK-32147] Deduplicate scaling report messages
---
 .../operator/autoscaler/ScalingExecutor.java       |  3 +-
 .../operator/autoscaler/ScalingExecutorTest.java   |  8 +++
 .../kubernetes/operator/utils/EventRecorder.java   | 29 ++++++++-
 .../kubernetes/operator/utils/EventUtils.java      | 11 +++-
 .../kubernetes/operator/utils/EventUtilsTest.java  | 72 +++++++++++++++++++++-
 5 files changed, 114 insertions(+), 9 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
index bc9d33e9..15d9dcba 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java
@@ -119,7 +119,8 @@ public class ScalingExecutor {
                 EventRecorder.Type.Normal,
                 EventRecorder.Reason.ScalingReport,
                 EventRecorder.Component.Operator,
-                scalingReport);
+                scalingReport,
+                "ScalingExecutor");
 
         if (!scalingEnabled) {
             return false;
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 71103e0e..b38059a4 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -242,6 +242,14 @@ public class ScalingExecutorTest {
                                         ? 
SCALING_SUMMARY_HEADER_SCALING_ENABLED
                                         : 
SCALING_SUMMARY_HEADER_SCALING_DISABLED));
         assertEquals(EventRecorder.Reason.ScalingReport.name(), 
event.getReason());
+
+        metrics = Map.of(jobVertexID, evaluated(1, 110, 101));
+        assertEquals(
+                scalingEnabled,
+                scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, 
conf, metrics));
+        var event2 = eventCollector.events.poll();
+        assertEquals(event.getMetadata().getUid(), 
event2.getMetadata().getUid());
+        assertEquals(2, event2.getCount());
     }
 
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
index bcf73afb..28903419 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java
@@ -26,6 +26,8 @@ import 
org.apache.flink.kubernetes.operator.listener.AuditUtils;
 import io.fabric8.kubernetes.api.model.Event;
 import io.fabric8.kubernetes.client.KubernetesClient;
 
+import javax.annotation.Nullable;
+
 import java.util.Collection;
 import java.util.function.BiConsumer;
 
@@ -47,7 +49,17 @@ public class EventRecorder {
             Reason reason,
             Component component,
             String message) {
-        return triggerEvent(resource, type, reason.toString(), message, 
component);
+        return triggerEvent(resource, type, reason, component, message, null);
+    }
+
+    public boolean triggerEvent(
+            AbstractFlinkResource<?, ?> resource,
+            Type type,
+            Reason reason,
+            Component component,
+            String message,
+            @Nullable String messageKey) {
+        return triggerEvent(resource, type, reason.toString(), message, 
component, messageKey);
     }
 
     public boolean triggerEvent(
@@ -55,7 +67,8 @@ public class EventRecorder {
             Type type,
             String reason,
             String message,
-            Component component) {
+            Component component,
+            String messageKey) {
         return EventUtils.createOrUpdateEvent(
                 client,
                 resource,
@@ -63,7 +76,17 @@ public class EventRecorder {
                 reason,
                 message,
                 component,
-                e -> eventListener.accept(resource, e));
+                e -> eventListener.accept(resource, e),
+                messageKey);
+    }
+
+    public boolean triggerEvent(
+            AbstractFlinkResource<?, ?> resource,
+            Type type,
+            String reason,
+            String message,
+            Component component) {
+        return triggerEvent(resource, type, reason, message, component, null);
     }
 
     public static EventRecorder create(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
index 1e374127..5ed40ed3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java
@@ -23,6 +23,8 @@ import io.fabric8.kubernetes.api.model.HasMetadata;
 import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder;
 import io.fabric8.kubernetes.client.KubernetesClient;
 
+import javax.annotation.Nullable;
+
 import java.time.Instant;
 import java.util.function.Consumer;
 
@@ -57,8 +59,13 @@ public class EventUtils {
             String reason,
             String message,
             EventRecorder.Component component,
-            Consumer<Event> eventListener) {
-        var eventName = generateEventName(target, type, reason, message, 
component);
+            Consumer<Event> eventListener,
+            @Nullable String messageKey) {
+
+        if (messageKey == null) {
+            messageKey = message;
+        }
+        var eventName = generateEventName(target, type, reason, messageKey, 
component);
 
         var existing =
                 client.v1()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
index 60fa8470..7ee52cdc 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java
@@ -63,7 +63,8 @@ public class EventUtilsTest {
                         reason,
                         message,
                         EventRecorder.Component.Operator,
-                        consumer));
+                        consumer,
+                        null));
         var event =
                 kubernetesClient
                         .v1()
@@ -85,7 +86,8 @@ public class EventUtilsTest {
                         reason,
                         message,
                         EventRecorder.Component.Operator,
-                        consumer));
+                        consumer,
+                        null));
         event =
                 kubernetesClient
                         .v1()
@@ -105,7 +107,71 @@ public class EventUtilsTest {
                         reason,
                         null,
                         EventRecorder.Component.Operator,
-                        consumer));
+                        consumer,
+                        null));
+    }
+
+    @Test
+    public void testCreateWithMessageKey() {
+        var consumer =
+                new Consumer<Event>() {
+                    @Override
+                    public void accept(Event event) {
+                        eventConsumed = event;
+                    }
+                };
+        var flinkApp = TestUtils.buildApplicationCluster();
+        var reason = "Cleanup";
+        var eventName =
+                EventUtils.generateEventName(
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "mk",
+                        EventRecorder.Component.Operator);
+
+        Assertions.assertTrue(
+                EventUtils.createOrUpdateEvent(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "message1",
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        "mk"));
+        var event =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(flinkApp.getMetadata().getNamespace())
+                        .withName(eventName)
+                        .get();
+        Assertions.assertNotNull(event);
+        Assertions.assertEquals("message1", event.getMessage());
+        Assertions.assertEquals(1, event.getCount());
+
+        Assertions.assertFalse(
+                EventUtils.createOrUpdateEvent(
+                        kubernetesClient,
+                        flinkApp,
+                        EventRecorder.Type.Warning,
+                        reason,
+                        "message2",
+                        EventRecorder.Component.Operator,
+                        consumer,
+                        "mk"));
+
+        event =
+                kubernetesClient
+                        .v1()
+                        .events()
+                        .inNamespace(flinkApp.getMetadata().getNamespace())
+                        .withName(eventName)
+                        .get();
+        Assertions.assertNotNull(event);
+        Assertions.assertEquals("message2", event.getMessage());
+        Assertions.assertEquals(2, event.getCount());
     }
 
     @Test

Reply via email to