This is an automated email from the ASF dual-hosted git repository. mxm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 41e235b6 [FLINK-31827] Discard old format scaling history (#574) 41e235b6 is described below commit 41e235b60c62cf3fb612de0131ef41d77ef0b0b8 Author: Maximilian Michels <m...@apache.org> AuthorDate: Mon Apr 24 21:41:32 2023 +0200 [FLINK-31827] Discard old format scaling history (#574) We recently change the format for the metric history but this also affects the scaling history because it contains new / old scaling metrics. --- .../operator/autoscaler/AutoScalerInfo.java | 22 +++++++++++++++------- .../operator/autoscaler/AutoScalerInfoTest.java | 19 ++++++++++++++++++- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java index 68bfc731..9ef01065 100644 --- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java +++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfo.java @@ -27,7 +27,7 @@ import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.JacksonException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; @@ -95,9 +95,10 @@ public class AutoScalerInfo { try { return YAML_MAPPER.readValue(decompress(historyYaml), new TypeReference<>() {}); - } catch (JsonProcessingException e) { + } catch (JacksonException e) { LOG.error( "Could not deserialize metric history, possibly the format changed. Discarding..."); + configMap.getData().remove(COLLECTED_METRICS_KEY); return new TreeMap<>(); } } @@ -105,7 +106,6 @@ public class AutoScalerInfo { @SneakyThrows public void updateMetricHistory( Instant jobUpdateTs, SortedMap<Instant, CollectedMetrics> history) { - configMap .getData() .put(COLLECTED_METRICS_KEY, compress(YAML_MAPPER.writeValueAsString(history))); @@ -137,10 +137,18 @@ public class AutoScalerInfo { return scalingHistory; } var yaml = decompress(configMap.getData().get(SCALING_HISTORY_KEY)); - scalingHistory = - yaml == null - ? new HashMap<>() - : YAML_MAPPER.readValue(yaml, new TypeReference<>() {}); + if (yaml == null) { + scalingHistory = new HashMap<>(); + return scalingHistory; + } + try { + scalingHistory = YAML_MAPPER.readValue(yaml, new TypeReference<>() {}); + } catch (JacksonException e) { + LOG.error( + "Could not deserialize scaling history, possibly the format changed. Discarding..."); + configMap.getData().remove(SCALING_HISTORY_KEY); + scalingHistory = new HashMap<>(); + } return scalingHistory; } diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java index 8f894643..ad78d761 100644 --- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java +++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/AutoScalerInfoTest.java @@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.runtime.jobgraph.JobVertexID; import com.fasterxml.jackson.core.JsonProcessingException; +import io.fabric8.kubernetes.api.model.ConfigMap; import org.junit.jupiter.api.Test; import java.time.Duration; @@ -40,6 +41,7 @@ import java.util.TreeMap; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** Test for AutoScalerInfo. */ @@ -205,7 +207,22 @@ public class AutoScalerInfoTest { @Test public void testDiscardInvalidHistory() { - var info = new AutoScalerInfo(Map.of(AutoScalerInfo.COLLECTED_METRICS_KEY, "invalid")); + ConfigMap configMap = new ConfigMap(); + configMap.setData( + new HashMap<>( + Map.of( + AutoScalerInfo.COLLECTED_METRICS_KEY, + "invalid", + AutoScalerInfo.SCALING_HISTORY_KEY, + "invalid2"))); + + var info = new AutoScalerInfo(configMap); + assertEquals(2, configMap.getData().size()); + assertEquals(new TreeMap<>(), info.getMetricHistory()); + assertNull(configMap.getData().get(AutoScalerInfo.COLLECTED_METRICS_KEY)); + + assertEquals(new TreeMap<>(), info.getScalingHistory()); + assertNull(configMap.getData().get(AutoScalerInfo.SCALING_HISTORY_KEY)); } }