This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 57974b61 [FLINK-32147] Deduplicate scaling report messages 57974b61 is described below commit 57974b61590e1976785fbdd4a40dafa975fc9521 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Fri May 19 16:42:03 2023 +0200 [FLINK-32147] Deduplicate scaling report messages --- .../operator/autoscaler/ScalingExecutor.java | 3 +- .../operator/autoscaler/ScalingExecutorTest.java | 8 +++ .../kubernetes/operator/utils/EventRecorder.java | 29 ++++++++- .../kubernetes/operator/utils/EventUtils.java | 11 +++- .../kubernetes/operator/utils/EventUtilsTest.java | 72 +++++++++++++++++++++- 5 files changed, 114 insertions(+), 9 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java index bc9d33e9..15d9dcba 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java @@ -119,7 +119,8 @@ public class ScalingExecutor { EventRecorder.Type.Normal, EventRecorder.Reason.ScalingReport, EventRecorder.Component.Operator, - scalingReport); + scalingReport, + "ScalingExecutor"); if (!scalingEnabled) { return false; diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java index 71103e0e..b38059a4 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java @@ -242,6 +242,14 @@ public class ScalingExecutorTest { ? SCALING_SUMMARY_HEADER_SCALING_ENABLED : SCALING_SUMMARY_HEADER_SCALING_DISABLED)); assertEquals(EventRecorder.Reason.ScalingReport.name(), event.getReason()); + + metrics = Map.of(jobVertexID, evaluated(1, 110, 101)); + assertEquals( + scalingEnabled, + scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + var event2 = eventCollector.events.poll(); + assertEquals(event.getMetadata().getUid(), event2.getMetadata().getUid()); + assertEquals(2, event2.getCount()); } private Map<ScalingMetric, EvaluatedScalingMetric> evaluated( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index bcf73afb..28903419 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -26,6 +26,8 @@ import org.apache.flink.kubernetes.operator.listener.AuditUtils; import io.fabric8.kubernetes.api.model.Event; import io.fabric8.kubernetes.client.KubernetesClient; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.function.BiConsumer; @@ -47,7 +49,17 @@ public class EventRecorder { Reason reason, Component component, String message) { - return triggerEvent(resource, type, reason.toString(), message, component); + return triggerEvent(resource, type, reason, component, message, null); + } + + public boolean triggerEvent( + AbstractFlinkResource<?, ?> resource, + Type type, + Reason reason, + Component component, + String message, + @Nullable String messageKey) { + return triggerEvent(resource, type, reason.toString(), message, component, messageKey); } public boolean triggerEvent( @@ -55,7 +67,8 @@ public class EventRecorder { Type type, String reason, String message, - Component component) { + Component component, + String messageKey) { return EventUtils.createOrUpdateEvent( client, resource, @@ -63,7 +76,17 @@ public class EventRecorder { reason, message, component, - e -> eventListener.accept(resource, e)); + e -> eventListener.accept(resource, e), + messageKey); + } + + public boolean triggerEvent( + AbstractFlinkResource<?, ?> resource, + Type type, + String reason, + String message, + Component component) { + return triggerEvent(resource, type, reason, message, component, null); } public static EventRecorder create( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java index 1e374127..5ed40ed3 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventUtils.java @@ -23,6 +23,8 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectReferenceBuilder; import io.fabric8.kubernetes.client.KubernetesClient; +import javax.annotation.Nullable; + import java.time.Instant; import java.util.function.Consumer; @@ -57,8 +59,13 @@ public class EventUtils { String reason, String message, EventRecorder.Component component, - Consumer<Event> eventListener) { - var eventName = generateEventName(target, type, reason, message, component); + Consumer<Event> eventListener, + @Nullable String messageKey) { + + if (messageKey == null) { + messageKey = message; + } + var eventName = generateEventName(target, type, reason, messageKey, component); var existing = client.v1() diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java index 60fa8470..7ee52cdc 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventUtilsTest.java @@ -63,7 +63,8 @@ public class EventUtilsTest { reason, message, EventRecorder.Component.Operator, - consumer)); + consumer, + null)); var event = kubernetesClient .v1() @@ -85,7 +86,8 @@ public class EventUtilsTest { reason, message, EventRecorder.Component.Operator, - consumer)); + consumer, + null)); event = kubernetesClient .v1() @@ -105,7 +107,71 @@ public class EventUtilsTest { reason, null, EventRecorder.Component.Operator, - consumer)); + consumer, + null)); + } + + @Test + public void testCreateWithMessageKey() { + var consumer = + new Consumer<Event>() { + @Override + public void accept(Event event) { + eventConsumed = event; + } + }; + var flinkApp = TestUtils.buildApplicationCluster(); + var reason = "Cleanup"; + var eventName = + EventUtils.generateEventName( + flinkApp, + EventRecorder.Type.Warning, + reason, + "mk", + EventRecorder.Component.Operator); + + Assertions.assertTrue( + EventUtils.createOrUpdateEvent( + kubernetesClient, + flinkApp, + EventRecorder.Type.Warning, + reason, + "message1", + EventRecorder.Component.Operator, + consumer, + "mk")); + var event = + kubernetesClient + .v1() + .events() + .inNamespace(flinkApp.getMetadata().getNamespace()) + .withName(eventName) + .get(); + Assertions.assertNotNull(event); + Assertions.assertEquals("message1", event.getMessage()); + Assertions.assertEquals(1, event.getCount()); + + Assertions.assertFalse( + EventUtils.createOrUpdateEvent( + kubernetesClient, + flinkApp, + EventRecorder.Type.Warning, + reason, + "message2", + EventRecorder.Component.Operator, + consumer, + "mk")); + + event = + kubernetesClient + .v1() + .events() + .inNamespace(flinkApp.getMetadata().getNamespace()) + .withName(eventName) + .get(); + Assertions.assertNotNull(event); + Assertions.assertEquals("message2", event.getMessage()); + Assertions.assertEquals(2, event.getCount()); } @Test