This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 3899050ae09ee0590d9810a858703d8d6a9ca602 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Thu Feb 16 12:04:27 2023 +0100 [FLINK-31218] Improve health probe to only detect newly unhealthy informers --- .../kubernetes/operator/health/HealthProbe.java | 47 +++++-- .../operator/health/InformerHealthSummary.java | 61 +++++++++ .../operator/health/InformerIdentifier.java | 29 +++++ .../operator/health/HealthProbeTest.java | 140 +++++++++++++++++++-- 4 files changed, 255 insertions(+), 22 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java index 8ce574fa..1b5d561f 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java @@ -19,13 +19,12 @@ package org.apache.flink.kubernetes.operator.health; import io.javaoperatorsdk.operator.RuntimeInfo; import lombok.Getter; -import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Set; /** Flink operator health probe. */ public enum HealthProbe { @@ -33,9 +32,9 @@ public enum HealthProbe { private static final Logger LOG = LoggerFactory.getLogger(HealthProbe.class); - private final AtomicBoolean isHealthy = new AtomicBoolean(true); + @Getter private RuntimeInfo runtimeInfo; - @Setter @Getter private RuntimeInfo runtimeInfo; + private InformerHealthSummary previousInformerHealthSummary; private final List<CanaryResourceManager<?>> canaryResourceManagers = new ArrayList<>(); @@ -43,20 +42,29 @@ public enum HealthProbe { canaryResourceManagers.add(canaryResourceManager); } - public boolean isHealthy() { - if (!isHealthy.get()) { - return false; - } + public void setRuntimeInfo(RuntimeInfo runtimeInfo) { + this.runtimeInfo = runtimeInfo; + previousInformerHealthSummary = InformerHealthSummary.fromRuntimeInfo(runtimeInfo); + LOG.info( + "Initially unhealthy informers: {}", + previousInformerHealthSummary.getUnhealthyInformers()); + } + public boolean isHealthy() { if (runtimeInfo != null) { - LOG.debug("Checking operator health"); - if (!runtimeInfo.allEventSourcesAreHealthy()) { - LOG.error("Unhealthy event sources: {}", runtimeInfo.unhealthyEventSources()); + LOG.debug("Checking event source health"); + var healthSummary = InformerHealthSummary.fromRuntimeInfo(runtimeInfo); + if (!healthSummary.isAnyHealthy()) { + LOG.error("All informers are unhealthy"); + return false; + } else if (anyInformerBecameUnhealthy(healthSummary.getUnhealthyInformers())) { return false; + } else { + previousInformerHealthSummary = healthSummary; } if (!runtimeInfo.isStarted()) { - LOG.error("Operator not running"); + LOG.error("Operator is not running"); return false; } } @@ -67,6 +75,21 @@ public enum HealthProbe { return false; } } + return true; } + + private boolean anyInformerBecameUnhealthy(Set<InformerIdentifier> unhealthyInformers) { + boolean unhealthy = false; + for (InformerIdentifier unhealthyInformer : unhealthyInformers) { + if (!previousInformerHealthSummary + .getUnhealthyInformers() + .contains(unhealthyInformer)) { + LOG.error("Informer became unhealthy: {}", unhealthyInformer); + unhealthy = true; + } + } + + return unhealthy; + } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerHealthSummary.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerHealthSummary.java new file mode 100644 index 00000000..6af0925f --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerHealthSummary.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.health; + +import io.javaoperatorsdk.operator.RuntimeInfo; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; +import io.javaoperatorsdk.operator.health.Status; +import lombok.Value; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** Operator informer health summary. */ +@Value +public class InformerHealthSummary { + + boolean anyHealthy; + Set<InformerIdentifier> unhealthyInformers; + + public static InformerHealthSummary fromRuntimeInfo(RuntimeInfo runtimeInfo) { + var newUnHealthy = new HashSet<InformerIdentifier>(); + boolean anyHealthy = false; + + for (var controllerEntry : + runtimeInfo.unhealthyInformerWrappingEventSourceHealthIndicator().entrySet()) { + for (var eventSourceEntry : controllerEntry.getValue().entrySet()) { + Map<String, InformerHealthIndicator> informers = + eventSourceEntry.getValue().informerHealthIndicators(); + for (var informerEntry : informers.entrySet()) { + if (informerEntry.getValue().getStatus() == Status.HEALTHY) { + anyHealthy = true; + } else { + newUnHealthy.add( + new InformerIdentifier( + controllerEntry.getKey(), + eventSourceEntry.getKey(), + informerEntry.getKey())); + } + } + } + } + + return new InformerHealthSummary(anyHealthy || newUnHealthy.isEmpty(), newUnHealthy); + } +} diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerIdentifier.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerIdentifier.java new file mode 100644 index 00000000..65166c82 --- /dev/null +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerIdentifier.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.health; + +import lombok.Value; + +/** Operator informer identifier. */ +@Value +public class InformerIdentifier { + + String controllerName; + String eventSourceName; + String informerName; +} diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java index dfdd2dff..91385252 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java @@ -31,11 +31,18 @@ import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import io.javaoperatorsdk.operator.Operator; import io.javaoperatorsdk.operator.RuntimeInfo; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.junit.jupiter.api.Test; import java.net.HttpURLConnection; import java.net.URL; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; @@ -74,11 +81,11 @@ public class HealthProbeTest { } @Test - public void testHealthProbe() { + public void testHealthProbeInfomers() { var isRunning = new AtomicBoolean(false); - var isHealthy = new AtomicBoolean(false); - - HealthProbe.INSTANCE.setRuntimeInfo( + var unhealthyEventSources = + new HashMap<String, Map<String, InformerWrappingEventSourceHealthIndicator>>(); + var runtimeInfo = new RuntimeInfo(new Operator(client)) { @Override public boolean isStarted() { @@ -86,19 +93,85 @@ public class HealthProbeTest { } @Override - public boolean allEventSourcesAreHealthy() { - return isHealthy.get(); + public Map<String, Map<String, InformerWrappingEventSourceHealthIndicator>> + unhealthyInformerWrappingEventSourceHealthIndicator() { + return unhealthyEventSources; } - }); + }; + + // Test if a new event source becomes unhealthy + HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo); assertFalse(HealthProbe.INSTANCE.isHealthy()); isRunning.set(true); + assertTrue(HealthProbe.INSTANCE.isHealthy()); + unhealthyEventSources.put( + "c1", Map.of("e1", informerHealthIndicator(Map.of("i1", Status.UNHEALTHY)))); assertFalse(HealthProbe.INSTANCE.isHealthy()); - isHealthy.set(true); + unhealthyEventSources.clear(); + assertTrue(HealthProbe.INSTANCE.isHealthy()); + + // Test if we detect when there is an unhealthy at start + unhealthyEventSources.put( + "c1", + Map.of( + "e1", + informerHealthIndicator( + Map.of("i1", Status.UNHEALTHY, "i2", Status.HEALTHY)))); + HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo); assertTrue(HealthProbe.INSTANCE.isHealthy()); - isRunning.set(false); + unhealthyEventSources.put( + "c1", + Map.of( + "e1", + informerHealthIndicator( + Map.of("i1", Status.UNHEALTHY, "i2", Status.HEALTHY)), + "e2", + informerHealthIndicator(Map.of("i3", Status.UNHEALTHY)))); assertFalse(HealthProbe.INSTANCE.isHealthy()); - isRunning.set(true); + assertFalse(HealthProbe.INSTANCE.isHealthy()); + unhealthyEventSources.put( + "c1", + Map.of( + "e1", + informerHealthIndicator( + Map.of("i1", Status.UNHEALTHY, "i2", Status.HEALTHY)), + "e2", + informerHealthIndicator(Map.of("i3", Status.HEALTHY)))); + assertTrue(HealthProbe.INSTANCE.isHealthy()); + unhealthyEventSources.put( + "c1", + Map.of( + "e1", + informerHealthIndicator( + Map.of("i1", Status.UNHEALTHY, "i2", Status.UNHEALTHY)))); + assertFalse(HealthProbe.INSTANCE.isHealthy()); + unhealthyEventSources.clear(); + assertTrue(HealthProbe.INSTANCE.isHealthy()); + + // All informers unhealthy at start: + unhealthyEventSources.put( + "c1", Map.of("e1", informerHealthIndicator(Map.of("i1", Status.UNHEALTHY)))); + HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo); + assertFalse(HealthProbe.INSTANCE.isHealthy()); + } + + @Test + public void testHealthProbeCanary() { + var runtimeInfo = + new RuntimeInfo(new Operator(client)) { + @Override + public boolean isStarted() { + return true; + } + + @Override + public Map<String, Map<String, InformerWrappingEventSourceHealthIndicator>> + unhealthyInformerWrappingEventSourceHealthIndicator() { + return Collections.emptyMap(); + } + }; + HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo); var canaryManager = new CanaryResourceManager<FlinkDeployment>( @@ -142,4 +215,51 @@ public class HealthProbeTest { connection.connect(); return connection.getResponseCode() == OK.code(); } + + private static InformerWrappingEventSourceHealthIndicator informerHealthIndicator( + Map<String, Status> informerStatuses) { + Map<String, InformerHealthIndicator> informers = new HashMap<>(); + informerStatuses.forEach( + (n, s) -> + informers.put( + n, + new InformerHealthIndicator() { + @Override + public boolean hasSynced() { + return false; + } + + @Override + public boolean isWatching() { + return false; + } + + @Override + public boolean isRunning() { + return false; + } + + @Override + public Status getStatus() { + return s; + } + + @Override + public String getTargetNamespace() { + return null; + } + })); + + return new InformerWrappingEventSourceHealthIndicator() { + @Override + public Map<String, InformerHealthIndicator> informerHealthIndicators() { + return informers; + } + + @Override + public ResourceConfiguration getInformerConfiguration() { + return null; + } + }; + } }