This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit 3899050ae09ee0590d9810a858703d8d6a9ca602
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Thu Feb 16 12:04:27 2023 +0100

    [FLINK-31218] Improve health probe to only detect newly unhealthy informers
---
 .../kubernetes/operator/health/HealthProbe.java    |  47 +++++--
 .../operator/health/InformerHealthSummary.java     |  61 +++++++++
 .../operator/health/InformerIdentifier.java        |  29 +++++
 .../operator/health/HealthProbeTest.java           | 140 +++++++++++++++++++--
 4 files changed, 255 insertions(+), 22 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java
index 8ce574fa..1b5d561f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/HealthProbe.java
@@ -19,13 +19,12 @@ package org.apache.flink.kubernetes.operator.health;
 
 import io.javaoperatorsdk.operator.RuntimeInfo;
 import lombok.Getter;
-import lombok.Setter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Set;
 
 /** Flink operator health probe. */
 public enum HealthProbe {
@@ -33,9 +32,9 @@ public enum HealthProbe {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HealthProbe.class);
 
-    private final AtomicBoolean isHealthy = new AtomicBoolean(true);
+    @Getter private RuntimeInfo runtimeInfo;
 
-    @Setter @Getter private RuntimeInfo runtimeInfo;
+    private InformerHealthSummary previousInformerHealthSummary;
 
     private final List<CanaryResourceManager<?>> canaryResourceManagers = new 
ArrayList<>();
 
@@ -43,20 +42,29 @@ public enum HealthProbe {
         canaryResourceManagers.add(canaryResourceManager);
     }
 
-    public boolean isHealthy() {
-        if (!isHealthy.get()) {
-            return false;
-        }
+    public void setRuntimeInfo(RuntimeInfo runtimeInfo) {
+        this.runtimeInfo = runtimeInfo;
+        previousInformerHealthSummary = 
InformerHealthSummary.fromRuntimeInfo(runtimeInfo);
+        LOG.info(
+                "Initially unhealthy informers: {}",
+                previousInformerHealthSummary.getUnhealthyInformers());
+    }
 
+    public boolean isHealthy() {
         if (runtimeInfo != null) {
-            LOG.debug("Checking operator health");
-            if (!runtimeInfo.allEventSourcesAreHealthy()) {
-                LOG.error("Unhealthy event sources: {}", 
runtimeInfo.unhealthyEventSources());
+            LOG.debug("Checking event source health");
+            var healthSummary = 
InformerHealthSummary.fromRuntimeInfo(runtimeInfo);
+            if (!healthSummary.isAnyHealthy()) {
+                LOG.error("All informers are unhealthy");
+                return false;
+            } else if 
(anyInformerBecameUnhealthy(healthSummary.getUnhealthyInformers())) {
                 return false;
+            } else {
+                previousInformerHealthSummary = healthSummary;
             }
 
             if (!runtimeInfo.isStarted()) {
-                LOG.error("Operator not running");
+                LOG.error("Operator is not running");
                 return false;
             }
         }
@@ -67,6 +75,21 @@ public enum HealthProbe {
                 return false;
             }
         }
+
         return true;
     }
+
+    private boolean anyInformerBecameUnhealthy(Set<InformerIdentifier> 
unhealthyInformers) {
+        boolean unhealthy = false;
+        for (InformerIdentifier unhealthyInformer : unhealthyInformers) {
+            if (!previousInformerHealthSummary
+                    .getUnhealthyInformers()
+                    .contains(unhealthyInformer)) {
+                LOG.error("Informer became unhealthy: {}", unhealthyInformer);
+                unhealthy = true;
+            }
+        }
+
+        return unhealthy;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerHealthSummary.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerHealthSummary.java
new file mode 100644
index 00000000..6af0925f
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerHealthSummary.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import io.javaoperatorsdk.operator.RuntimeInfo;
+import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
+import io.javaoperatorsdk.operator.health.Status;
+import lombok.Value;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/** Operator informer health summary. */
+@Value
+public class InformerHealthSummary {
+
+    boolean anyHealthy;
+    Set<InformerIdentifier> unhealthyInformers;
+
+    public static InformerHealthSummary fromRuntimeInfo(RuntimeInfo 
runtimeInfo) {
+        var newUnHealthy = new HashSet<InformerIdentifier>();
+        boolean anyHealthy = false;
+
+        for (var controllerEntry :
+                
runtimeInfo.unhealthyInformerWrappingEventSourceHealthIndicator().entrySet()) {
+            for (var eventSourceEntry : controllerEntry.getValue().entrySet()) 
{
+                Map<String, InformerHealthIndicator> informers =
+                        eventSourceEntry.getValue().informerHealthIndicators();
+                for (var informerEntry : informers.entrySet()) {
+                    if (informerEntry.getValue().getStatus() == 
Status.HEALTHY) {
+                        anyHealthy = true;
+                    } else {
+                        newUnHealthy.add(
+                                new InformerIdentifier(
+                                        controllerEntry.getKey(),
+                                        eventSourceEntry.getKey(),
+                                        informerEntry.getKey()));
+                    }
+                }
+            }
+        }
+
+        return new InformerHealthSummary(anyHealthy || newUnHealthy.isEmpty(), 
newUnHealthy);
+    }
+}
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerIdentifier.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerIdentifier.java
new file mode 100644
index 00000000..65166c82
--- /dev/null
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/health/InformerIdentifier.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.health;
+
+import lombok.Value;
+
+/** Operator informer identifier. */
+@Value
+public class InformerIdentifier {
+
+    String controllerName;
+    String eventSourceName;
+    String informerName;
+}
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java
index dfdd2dff..91385252 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/health/HealthProbeTest.java
@@ -31,11 +31,18 @@ import 
io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
 import io.javaoperatorsdk.operator.Operator;
 import io.javaoperatorsdk.operator.RuntimeInfo;
 import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
+import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
+import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
+import 
io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
+import io.javaoperatorsdk.operator.health.Status;
 import io.javaoperatorsdk.operator.processing.event.ResourceID;
 import org.junit.jupiter.api.Test;
 
 import java.net.HttpURLConnection;
 import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
@@ -74,11 +81,11 @@ public class HealthProbeTest {
     }
 
     @Test
-    public void testHealthProbe() {
+    public void testHealthProbeInfomers() {
         var isRunning = new AtomicBoolean(false);
-        var isHealthy = new AtomicBoolean(false);
-
-        HealthProbe.INSTANCE.setRuntimeInfo(
+        var unhealthyEventSources =
+                new HashMap<String, Map<String, 
InformerWrappingEventSourceHealthIndicator>>();
+        var runtimeInfo =
                 new RuntimeInfo(new Operator(client)) {
                     @Override
                     public boolean isStarted() {
@@ -86,19 +93,85 @@ public class HealthProbeTest {
                     }
 
                     @Override
-                    public boolean allEventSourcesAreHealthy() {
-                        return isHealthy.get();
+                    public Map<String, Map<String, 
InformerWrappingEventSourceHealthIndicator>>
+                            
unhealthyInformerWrappingEventSourceHealthIndicator() {
+                        return unhealthyEventSources;
                     }
-                });
+                };
+
+        // Test if a new event source becomes unhealthy
+        HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo);
 
         assertFalse(HealthProbe.INSTANCE.isHealthy());
         isRunning.set(true);
+        assertTrue(HealthProbe.INSTANCE.isHealthy());
+        unhealthyEventSources.put(
+                "c1", Map.of("e1", informerHealthIndicator(Map.of("i1", 
Status.UNHEALTHY))));
         assertFalse(HealthProbe.INSTANCE.isHealthy());
-        isHealthy.set(true);
+        unhealthyEventSources.clear();
+        assertTrue(HealthProbe.INSTANCE.isHealthy());
+
+        // Test if we detect when there is an unhealthy at start
+        unhealthyEventSources.put(
+                "c1",
+                Map.of(
+                        "e1",
+                        informerHealthIndicator(
+                                Map.of("i1", Status.UNHEALTHY, "i2", 
Status.HEALTHY))));
+        HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo);
         assertTrue(HealthProbe.INSTANCE.isHealthy());
-        isRunning.set(false);
+        unhealthyEventSources.put(
+                "c1",
+                Map.of(
+                        "e1",
+                        informerHealthIndicator(
+                                Map.of("i1", Status.UNHEALTHY, "i2", 
Status.HEALTHY)),
+                        "e2",
+                        informerHealthIndicator(Map.of("i3", 
Status.UNHEALTHY))));
         assertFalse(HealthProbe.INSTANCE.isHealthy());
-        isRunning.set(true);
+        assertFalse(HealthProbe.INSTANCE.isHealthy());
+        unhealthyEventSources.put(
+                "c1",
+                Map.of(
+                        "e1",
+                        informerHealthIndicator(
+                                Map.of("i1", Status.UNHEALTHY, "i2", 
Status.HEALTHY)),
+                        "e2",
+                        informerHealthIndicator(Map.of("i3", 
Status.HEALTHY))));
+        assertTrue(HealthProbe.INSTANCE.isHealthy());
+        unhealthyEventSources.put(
+                "c1",
+                Map.of(
+                        "e1",
+                        informerHealthIndicator(
+                                Map.of("i1", Status.UNHEALTHY, "i2", 
Status.UNHEALTHY))));
+        assertFalse(HealthProbe.INSTANCE.isHealthy());
+        unhealthyEventSources.clear();
+        assertTrue(HealthProbe.INSTANCE.isHealthy());
+
+        // All informers unhealthy at start:
+        unhealthyEventSources.put(
+                "c1", Map.of("e1", informerHealthIndicator(Map.of("i1", 
Status.UNHEALTHY))));
+        HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo);
+        assertFalse(HealthProbe.INSTANCE.isHealthy());
+    }
+
+    @Test
+    public void testHealthProbeCanary() {
+        var runtimeInfo =
+                new RuntimeInfo(new Operator(client)) {
+                    @Override
+                    public boolean isStarted() {
+                        return true;
+                    }
+
+                    @Override
+                    public Map<String, Map<String, 
InformerWrappingEventSourceHealthIndicator>>
+                            
unhealthyInformerWrappingEventSourceHealthIndicator() {
+                        return Collections.emptyMap();
+                    }
+                };
+        HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo);
 
         var canaryManager =
                 new CanaryResourceManager<FlinkDeployment>(
@@ -142,4 +215,51 @@ public class HealthProbeTest {
         connection.connect();
         return connection.getResponseCode() == OK.code();
     }
+
+    private static InformerWrappingEventSourceHealthIndicator 
informerHealthIndicator(
+            Map<String, Status> informerStatuses) {
+        Map<String, InformerHealthIndicator> informers = new HashMap<>();
+        informerStatuses.forEach(
+                (n, s) ->
+                        informers.put(
+                                n,
+                                new InformerHealthIndicator() {
+                                    @Override
+                                    public boolean hasSynced() {
+                                        return false;
+                                    }
+
+                                    @Override
+                                    public boolean isWatching() {
+                                        return false;
+                                    }
+
+                                    @Override
+                                    public boolean isRunning() {
+                                        return false;
+                                    }
+
+                                    @Override
+                                    public Status getStatus() {
+                                        return s;
+                                    }
+
+                                    @Override
+                                    public String getTargetNamespace() {
+                                        return null;
+                                    }
+                                }));
+
+        return new InformerWrappingEventSourceHealthIndicator() {
+            @Override
+            public Map<String, InformerHealthIndicator> 
informerHealthIndicators() {
+                return informers;
+            }
+
+            @Override
+            public ResourceConfiguration getInformerConfiguration() {
+                return null;
+            }
+        };
+    }
 }

Reply via email to