This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 41e235b6 [FLINK-31827] Discard old format scaling history (#574)
41e235b6 is described below

commit 41e235b60c62cf3fb612de0131ef41d77ef0b0b8
Author: Maximilian Michels <m...@apache.org>
AuthorDate: Mon Apr 24 21:41:32 2023 +0200

    [FLINK-31827] Discard old format scaling history (#574)
    
    We recently change the format for the metric history but this also affects 
the
    scaling history because it contains new / old scaling metrics.
---
 .../operator/autoscaler/AutoScalerInfo.java        | 22 +++++++++++++++-------
 .../operator/autoscaler/AutoScalerInfoTest.java    | 19 ++++++++++++++++++-
 2 files changed, 33 insertions(+), 8 deletions(-)

diff --git 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
index 68bfc731..9ef01065 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java
@@ -27,7 +27,7 @@ import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.util.Preconditions;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.JacksonException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -95,9 +95,10 @@ public class AutoScalerInfo {
 
         try {
             return YAML_MAPPER.readValue(decompress(historyYaml), new 
TypeReference<>() {});
-        } catch (JsonProcessingException e) {
+        } catch (JacksonException e) {
             LOG.error(
                     "Could not deserialize metric history, possibly the format 
changed. Discarding...");
+            configMap.getData().remove(COLLECTED_METRICS_KEY);
             return new TreeMap<>();
         }
     }
@@ -105,7 +106,6 @@ public class AutoScalerInfo {
     @SneakyThrows
     public void updateMetricHistory(
             Instant jobUpdateTs, SortedMap<Instant, CollectedMetrics> history) 
{
-
         configMap
                 .getData()
                 .put(COLLECTED_METRICS_KEY, 
compress(YAML_MAPPER.writeValueAsString(history)));
@@ -137,10 +137,18 @@ public class AutoScalerInfo {
             return scalingHistory;
         }
         var yaml = decompress(configMap.getData().get(SCALING_HISTORY_KEY));
-        scalingHistory =
-                yaml == null
-                        ? new HashMap<>()
-                        : YAML_MAPPER.readValue(yaml, new TypeReference<>() 
{});
+        if (yaml == null) {
+            scalingHistory = new HashMap<>();
+            return scalingHistory;
+        }
+        try {
+            scalingHistory = YAML_MAPPER.readValue(yaml, new TypeReference<>() 
{});
+        } catch (JacksonException e) {
+            LOG.error(
+                    "Could not deserialize scaling history, possibly the 
format changed. Discarding...");
+            configMap.getData().remove(SCALING_HISTORY_KEY);
+            scalingHistory = new HashMap<>();
+        }
         return scalingHistory;
     }
 
diff --git 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
index 8f894643..ad78d761 100644
--- 
a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
+++ 
b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
+import io.fabric8.kubernetes.api.model.ConfigMap;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
@@ -40,6 +41,7 @@ import java.util.TreeMap;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Test for AutoScalerInfo. */
@@ -205,7 +207,22 @@ public class AutoScalerInfoTest {
 
     @Test
     public void testDiscardInvalidHistory() {
-        var info = new 
AutoScalerInfo(Map.of(AutoScalerInfo.COLLECTED_METRICS_KEY, "invalid"));
+        ConfigMap configMap = new ConfigMap();
+        configMap.setData(
+                new HashMap<>(
+                        Map.of(
+                                AutoScalerInfo.COLLECTED_METRICS_KEY,
+                                "invalid",
+                                AutoScalerInfo.SCALING_HISTORY_KEY,
+                                "invalid2")));
+
+        var info = new AutoScalerInfo(configMap);
+        assertEquals(2, configMap.getData().size());
+
         assertEquals(new TreeMap<>(), info.getMetricHistory());
+        
assertNull(configMap.getData().get(AutoScalerInfo.COLLECTED_METRICS_KEY));
+
+        assertEquals(new TreeMap<>(), info.getScalingHistory());
+        
assertNull(configMap.getData().get(AutoScalerInfo.SCALING_HISTORY_KEY));
     }
 }

Reply via email to