This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f7c1a3acd refactor: Kubernetes coordinator to be more accurate about 
nodes readiness (#13493)
3f7c1a3acd is described below

commit 3f7c1a3acd320130cab0fb69d087676d3b733688
Author: kezhenxu94 <[email protected]>
AuthorDate: Mon Sep 15 14:27:28 2025 +0800

    refactor: Kubernetes coordinator to be more accurate about nodes readiness 
(#13493)
---
 docs/en/changes/changes.md                         |   1 +
 .../plugin/kubernetes/KubernetesCoordinator.java   | 190 ++++-------
 .../KubernetesLabelSelectorEndpointGroup.java      | 215 ++++++++++++
 .../kubernetes/NamespacedPodListInformer.java      |  74 ----
 .../ClusterModuleKubernetesProviderTest.java       |   4 +-
 .../kubernetes/KubernetesCoordinatorTest.java      | 377 ---------------------
 6 files changed, 288 insertions(+), 573 deletions(-)

diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 92a707c947..482abae02c 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -96,6 +96,7 @@
 * Fix metrics comparison in promql with bool modifier.
 * Add rate limiter for Zipkin trace receiver to limit maximum spans per second.
 * Open `health-checker` module by default due to latest UI changes. Change the 
default check period to 30s.
+* Refactor Kubernetes coordinator to be more accurate about node readiness.
 
 #### UI
 
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 97abc20da9..f06ee72818 100644
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -18,15 +18,10 @@
 
 package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
 
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodStatus;
-import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
-import org.apache.skywalking.oap.server.core.cluster.ClusterHealthStatus;
-import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
 import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
 import org.apache.skywalking.oap.server.core.cluster.ServiceQueryException;
 import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
@@ -39,66 +34,79 @@ import 
org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
 import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
 
-import java.util.ArrayList;
-import java.util.Collections;
+import com.google.common.base.Strings;
+import com.linecorp.armeria.client.endpoint.EndpointGroup;
+
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
-import static 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.ADDED;
-import static 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.DELETED;
-import static 
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.MODIFIED;
-
 
 /**
- * Read collector pod info from api-server of kubernetes, then using all 
containerIp list to construct the list of
+ * Read collector pod info from Kubernetes using 
KubernetesLabelSelectorEndpointGroup, then construct the list of
  * {@link RemoteInstance}.
  */
 @Slf4j
 public class KubernetesCoordinator extends ClusterCoordinator {
     private final ModuleDefineHolder manager;
-    private final String uid;
     private volatile int port = -1;
     private HealthCheckMetrics healthChecker;
     private ClusterModuleKubernetesConfig config;
-    private final Map<String, RemoteInstance> remoteInstanceMap;
-    private volatile List<String> latestInstances;
+
+    private EndpointGroup endpointGroup;
+    private volatile List<RemoteInstance> remoteInstances;
 
     public KubernetesCoordinator(final ModuleDefineHolder manager,
                                  final ClusterModuleKubernetesConfig config) {
-        this.uid = new UidEnvSupplier(config.getUidEnvName()).get();
         this.manager = manager;
         this.config = config;
-        this.remoteInstanceMap = new ConcurrentHashMap<>(20);
-        this.latestInstances = new ArrayList<>(20);
+
+        if (Strings.isNullOrEmpty(config.getLabelSelector())) {
+            throw new IllegalArgumentException("kubernetes labelSelector must 
be provided");
+        }
+    }
+
+    private EndpointGroup createEndpointGroup() {
+        if (port == -1) {
+            port = 
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
+        }
+        final var kubernetesClient = new KubernetesClientBuilder().build();
+        final var builder = 
KubernetesLabelSelectorEndpointGroup.builder(kubernetesClient);
+
+        if (StringUtil.isNotBlank(config.getNamespace())) {
+            builder.namespace(config.getNamespace());
+        }
+
+        final var labelMap = parseLabelSelector(config.getLabelSelector());
+        builder.labelSelector(labelMap);
+
+        builder.port(port);
+        builder.selfUid(new UidEnvSupplier(config.getUidEnvName()).get());
+
+        return builder.build();
+    }
+
+    private Map<String, String> parseLabelSelector(String labelSelector) {
+        final var labels = new HashMap<String, String>();
+        if (StringUtil.isBlank(labelSelector)) {
+            return labels;
+        }
+
+        final var pairs = labelSelector.split(",");
+        for (final var pair : pairs) {
+            final var keyValue = pair.trim().split("=", 2);
+            if (keyValue.length == 2) {
+                labels.put(keyValue[0].trim(), keyValue[1].trim());
+            }
+        }
+        return labels;
     }
 
     @Override
     public List<RemoteInstance> queryRemoteNodes() {
         try {
-            List<Pod> pods = 
NamespacedPodListInformer.INFORMER.listPods().orElseGet(this::selfPod);
-            if (log.isDebugEnabled()) {
-                List<String> uidList = pods
-                    .stream()
-                    .map(item -> item.getMetadata().getUid())
-                    .collect(Collectors.toList());
-                log.debug("[kubernetes cluster pods uid list]:{}", uidList);
-            }
-            if (port == -1) {
-                port = 
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
-            }
-            List<RemoteInstance> remoteInstances =
-                pods.stream()
-                    .filter(pod -> 
StringUtil.isNotBlank(pod.getStatus().getPodIP()))
-                    .map(pod -> new RemoteInstance(
-                        new Address(pod.getStatus().getPodIP(), port, 
pod.getMetadata().getUid().equals(uid))))
-                    .collect(Collectors.toList());
             healthChecker.health();
-            this.latestInstances = remoteInstances.stream().map(it -> 
it.getAddress().toString()).collect(Collectors.toList());
-            if (log.isDebugEnabled()) {
-                remoteInstances.forEach(instance -> log.debug("kubernetes 
cluster instance: {}", instance));
-            }
             return remoteInstances;
         } catch (Throwable e) {
             healthChecker.unHealth(e);
@@ -127,93 +135,33 @@ public class KubernetesCoordinator extends 
ClusterCoordinator {
         }
     }
 
-    private List<Pod> selfPod() {
-        Pod v1Pod = new Pod();
-        v1Pod.setMetadata(new ObjectMeta());
-        v1Pod.setStatus(new PodStatus());
-        v1Pod.getMetadata().setUid(uid);
-        v1Pod.getStatus().setPodIP("127.0.0.1");
-        return Collections.singletonList(v1Pod);
-    }
-
     @Override
     public void start() {
-        initHealthChecker();
-        NamespacedPodListInformer.INFORMER.init(config, new 
K8sResourceEventHandler());
-    }
+        endpointGroup = createEndpointGroup();
+        endpointGroup.addListener(endpoints -> {
+            if (port == -1) {
+                port = 
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
+            }
 
-    class K8sResourceEventHandler implements ResourceEventHandler<Pod> {
+            if (log.isDebugEnabled()) {
+                log.debug("[kubernetes cluster endpoints]: {}", endpoints);
+            }
 
-        @Override
-        public void onAdd(final Pod obj) {
-            updateRemoteInstances(obj, ADDED);
-        }
+            final var instances = endpoints.stream()
+                    .map(endpoint -> new RemoteInstance(new 
Address(endpoint.host(), endpoint.port(), false)))
+                    .collect(Collectors.toList());
 
-        @Override
-        public void onUpdate(final Pod oldObj, final Pod newObj) {
-            updateRemoteInstances(newObj, MODIFIED);
-        }
+            // The endpoint group will never include itself, add it.
+            final var selfInstance = new RemoteInstance(new 
Address("127.0.0.1", port, true));
+            instances.add(selfInstance);
 
-        @Override
-        public void onDelete(final Pod obj, final boolean 
deletedFinalStateUnknown) {
-            updateRemoteInstances(obj, DELETED);
-        }
-    }
-
-    /**
-     * When a remote instance up/off line, will receive multi event according 
to the pod status.
-     * To avoid notify the watchers too frequency, here use a 
`remoteInstanceMap` to cache them.
-     * Only notify watchers once when the instances changed.
-     */
-    private void updateRemoteInstances(Pod pod, EventType event) {
-        try {
-            initHealthChecker();
-            if (StringUtil.isNotBlank(pod.getStatus().getPodIP())) {
-                if (port == -1) {
-                    port = 
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
-                }
-
-                RemoteInstance remoteInstance = new RemoteInstance(
-                    new Address(pod.getStatus().getPodIP(), this.port, 
pod.getMetadata().getUid().equals(uid)));
-                switch (event) {
-                    case ADDED:
-                    case MODIFIED:
-                        if 
("Running".equalsIgnoreCase(pod.getStatus().getPhase())) {
-                            
remoteInstanceMap.put(remoteInstance.getAddress().toString(), remoteInstance);
-                        } else if 
("Terminating".equalsIgnoreCase(pod.getStatus().getPhase())) {
-                            
remoteInstanceMap.remove(remoteInstance.getAddress().toString());
-                        }
-                        break;
-                    case DELETED:
-                        
this.remoteInstanceMap.remove(remoteInstance.getAddress().toString());
-                        break;
-                    default:
-                        return;
-                }
-                updateRemoteInstances();
+            if (log.isDebugEnabled()) {
+                instances.forEach(instance -> log.debug("kubernetes cluster 
instance: {}", instance));
             }
-        } catch (Throwable e) {
-            healthChecker.unHealth(e);
-            log.error("Failed to notify RemoteInstances update.", e);
-        }
-    }
-
-    private void updateRemoteInstances() {
-        List<String> updatedInstances = new 
ArrayList<>(this.remoteInstanceMap.keySet());
-        if (this.latestInstances.size() != updatedInstances.size() || 
!this.latestInstances.containsAll(updatedInstances)) {
-            List<RemoteInstance> remoteInstances = new 
ArrayList<>(this.remoteInstanceMap.values());
-            this.latestInstances = updatedInstances;
-            checkHealth(remoteInstances);
-            notifyWatchers(remoteInstances);
-        }
-    }
 
-    private void checkHealth(List<RemoteInstance> remoteInstances) {
-        ClusterHealthStatus healthStatus = 
OAPNodeChecker.isHealth(remoteInstances);
-        if (healthStatus.isHealth()) {
-            this.healthChecker.health();
-        } else {
-            this.healthChecker.unHealth(healthStatus.getReason());
-        }
+            this.remoteInstances = instances;
+            notifyWatchers(instances);
+        }, true);
+        initHealthChecker();
     }
 }
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
new file mode 100644
index 0000000000..36fa6d6aae
--- /dev/null
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import com.linecorp.armeria.client.Endpoint;
+import com.linecorp.armeria.client.endpoint.DynamicEndpointGroup;
+import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;
+import com.linecorp.armeria.internal.shaded.guava.base.Strings;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.cache.Lister;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class KubernetesLabelSelectorEndpointGroup extends DynamicEndpointGroup 
{
+
+    private final KubernetesClient kubernetesClient;
+    private final String namespace;
+    private final Map<String, String> labelSelector;
+    private final int port;
+    private final String portName;
+    private final SharedIndexInformer<Pod> podInformer;
+    private final String selfUid;
+
+    private KubernetesLabelSelectorEndpointGroup(Builder builder) {
+        super(builder.selectionStrategy);
+        this.kubernetesClient = builder.kubernetesClient;
+        this.namespace = builder.namespace;
+        this.labelSelector = builder.labelSelector;
+        this.port = builder.port;
+        this.portName = builder.portName;
+        this.selfUid = Strings.nullToEmpty(builder.selfUid);
+
+        this.podInformer = kubernetesClient.pods()
+                .inNamespace(namespace)
+                .withLabels(labelSelector)
+                .inform(new PodEventHandler());
+
+        updateEndpoints();
+    }
+
+    public static Builder builder(KubernetesClient kubernetesClient) {
+        return new Builder(kubernetesClient);
+    }
+
+    @Override
+    protected void doCloseAsync(CompletableFuture<?> future) {
+        if (podInformer != null) {
+            try {
+                podInformer.close();
+                future.complete(null);
+            } catch (Exception e) {
+                future.completeExceptionally(e);
+            }
+        } else {
+            future.complete(null);
+        }
+    }
+
+    private void updateEndpoints() {
+        try {
+            if (podInformer == null) {
+                log.warn("Pod informer is not initialized yet.");
+                return;
+            }
+            final var podLister = new Lister<>(podInformer.getIndexer());
+            final var pods = podLister.namespace(namespace).list();
+
+            final var newEndpoints = pods.stream()
+                    .filter(this::isPodReady)
+                    .filter(pod -> 
StringUtil.isNotBlank(pod.getStatus().getPodIP()))
+                    .filter(pod -> !pod.getMetadata().getUid().equals(selfUid))
+                    .map(this::createEndpoint)
+                    .filter(endpoint -> endpoint != null)
+                    .collect(Collectors.toList());
+
+            log.debug("Updating endpoints to: {}", newEndpoints);
+            setEndpoints(newEndpoints);
+        } catch (Exception e) {
+            log.error("Failed to update endpoints", e);
+        }
+    }
+
+    private boolean isPodReady(Pod pod) {
+        final var podStatus = pod.getStatus();
+        if (podStatus == null) {
+            return false;
+        }
+        if (!"Running".equalsIgnoreCase(podStatus.getPhase())) {
+            return false;
+        }
+        if (podStatus.getContainerStatuses() == null || 
podStatus.getContainerStatuses().isEmpty()) {
+            return false;
+        }
+        if (podStatus.getConditions() == null || 
podStatus.getConditions().isEmpty()) {
+            return false;
+        }
+
+        final var allContainersReady =
+            podStatus.getContainerStatuses()
+                .stream().allMatch(containerStatus -> 
containerStatus.getReady() != Boolean.FALSE);
+        final var podReadyCondition =
+            podStatus.getConditions()
+                .stream()
+                .anyMatch(condition -> 
"Ready".equalsIgnoreCase(condition.getType())
+                    && condition.getStatus() != null
+                    && condition.getStatus().equalsIgnoreCase("True"));
+        return allContainersReady && podReadyCondition;
+    }
+
+    private Endpoint createEndpoint(Pod pod) {
+        final var podIP = pod.getStatus().getPodIP();
+        if (StringUtil.isBlank(podIP)) {
+            return null;
+        }
+
+        final var targetPort = determineTargetPort(pod);
+        if (targetPort <= 0) {
+            log.warn("Could not determine target port for pod: {}", 
pod.getMetadata().getName());
+            return null;
+        }
+
+        return Endpoint.of(podIP, targetPort);
+    }
+
+    private int determineTargetPort(Pod pod) {
+        if (port > 0) {
+            return port;
+        }
+
+        if (StringUtil.isNotBlank(portName) && pod.getSpec().getContainers() 
!= null) {
+            return pod.getSpec().getContainers().stream()
+                    .flatMap(container -> container.getPorts() != null ? 
container.getPorts().stream() : null)
+                    .filter(containerPort -> 
portName.equals(containerPort.getName()))
+                    .mapToInt(containerPort -> 
containerPort.getContainerPort())
+                    .findFirst()
+                    .orElse(-1);
+        }
+
+        return -1;
+    }
+
+    private class PodEventHandler implements ResourceEventHandler<Pod> {
+        @Override
+        public void onAdd(Pod pod) {
+            log.debug("Pod added: {}", pod.getMetadata().getName());
+            updateEndpoints();
+        }
+
+        @Override
+        public void onUpdate(Pod oldPod, Pod newPod) {
+            log.debug("Pod updated: {}, {}", newPod.getMetadata().getName(), 
newPod.getStatus());
+            updateEndpoints();
+        }
+
+        @Override
+        public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
+            log.debug("Pod deleted: {}", pod.getMetadata().getName());
+            updateEndpoints();
+        }
+    }
+
+    @Data
+    @Accessors(fluent = true)
+    public static class Builder {
+        private final KubernetesClient kubernetesClient;
+        private String namespace = "default";
+        private Map<String, String> labelSelector = new ConcurrentHashMap<>();
+        private int port = -1;
+        private String portName;
+        private EndpointSelectionStrategy selectionStrategy = 
EndpointSelectionStrategy.weightedRoundRobin();
+        private String selfUid;
+
+        private Builder(KubernetesClient kubernetesClient) {
+            this.kubernetesClient = kubernetesClient;
+        }
+
+        public KubernetesLabelSelectorEndpointGroup build() {
+            if (port <= 0 && StringUtil.isBlank(portName)) {
+                throw new IllegalArgumentException("Either port or portName 
must be specified");
+            }
+            if (labelSelector.isEmpty()) {
+                throw new IllegalArgumentException("Label selector must not be 
empty");
+            }
+            return new KubernetesLabelSelectorEndpointGroup(this);
+        }
+    }
+}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/NamespacedPodListInformer.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/NamespacedPodListInformer.java
deleted file mode 100644
index ac5e9d8f6a..0000000000
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/NamespacedPodListInformer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
-
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.client.KubernetesClientBuilder;
-import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
-import io.fabric8.kubernetes.client.informers.cache.Lister;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.isNull;
-
-@Slf4j
-public enum NamespacedPodListInformer {
-
-    /**
-     * contains remote collector instances
-     */
-    INFORMER;
-
-    private Lister<Pod> podLister;
-
-    public synchronized void init(ClusterModuleKubernetesConfig podConfig, 
ResourceEventHandler<Pod> eventHandler) {
-        try {
-            final var kubernetesClient = new KubernetesClientBuilder().build();
-            final var informer = kubernetesClient
-                .pods()
-                .inNamespace(podConfig.getNamespace())
-                .withLabelSelector(podConfig.getLabelSelector())
-                .inform(eventHandler);
-
-            podLister = new Lister<>(informer.getIndexer());
-
-            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
-                informer.close();
-                kubernetesClient.close();
-            }));
-        } catch (Exception e) {
-            log.error("cannot connect with api server in kubernetes", e);
-        }
-    }
-
-    public Optional<List<Pod>> listPods() {
-        if (isNull(podLister)) {
-            return Optional.empty();
-        }
-        return Optional.ofNullable(podLister.list().size() != 0
-            ? podLister.list()
-                       .stream()
-                       .filter(pod -> 
"Running".equalsIgnoreCase(pod.getStatus().getPhase()))
-                       .collect(Collectors.toList())
-            : null);
-    }
-}
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
index 1f146bed23..11d5da56b1 100644
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
@@ -46,10 +46,12 @@ public class ClusterModuleKubernetesProviderTest {
 
     @BeforeEach
     public void before() {
+        final var config = new ClusterModuleKubernetesConfig();
+        config.setLabelSelector("app=oap");
         TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
         Whitebox.setInternalState(telemetryModule, "loadedProvider", 
telemetryProvider);
         provider.setManager(moduleManager);
-        Whitebox.setInternalState(provider, "config", new 
ClusterModuleKubernetesConfig());
+        Whitebox.setInternalState(provider, "config", config);
     }
 
     @Test
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
deleted file mode 100644
index 4e35e9e134..0000000000
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
-
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodCondition;
-import io.fabric8.kubernetes.api.model.PodStatus;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
-import org.apache.skywalking.oap.server.core.cluster.ClusterWatcher;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
-import org.apache.skywalking.oap.server.core.remote.client.Address;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
-import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
-import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
-import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
-import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-import org.powermock.reflect.Whitebox;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static 
uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariable;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.LENIENT)
-public class KubernetesCoordinatorTest {
-    public static final String LOCAL_HOST = "127.0.0.1";
-    public static final String REMOTE_HOST = "127.0.0.2";
-    public static final Integer GRPC_PORT = 11800;
-    public static final String SELF_UID = "self";
-    public static final String REMOTE_UID = "remote";
-
-    @Mock
-    private ModuleManager moduleManager;
-    @Mock
-    private NoneTelemetryProvider telemetryProvider;
-    private ModuleProvider providerA;
-    private Address addressA;
-    private Address addressB;
-    private KubernetesCoordinator coordinatorA;
-    private KubernetesCoordinator coordinatorB;
-    private Pod podA;
-    private Pod podB;
-
-    @BeforeEach
-    public void prepare() {
-        Mockito.when(telemetryProvider.getService(MetricsCreator.class))
-               .thenReturn(new MetricsCreatorNoop());
-        TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
-        Whitebox.setInternalState(telemetryModule, "loadedProvider", 
telemetryProvider);
-        NamespacedPodListInformer informer = 
mock(NamespacedPodListInformer.class);
-        Whitebox.setInternalState(NamespacedPodListInformer.class, "INFORMER", 
informer);
-        
when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
-        
when(moduleManager.find(CoreModule.NAME)).thenReturn(mock(ModuleProviderHolder.class));
-        
when(moduleManager.find(CoreModule.NAME).provider()).thenReturn(mock(ModuleServiceHolder.class));
-        
when(moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class)).thenReturn(
-            mock(ConfigService.class));
-        
when(moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort()).thenReturn(
-            GRPC_PORT);
-
-        addressA = new Address(LOCAL_HOST, GRPC_PORT, true);
-        addressB = new Address(REMOTE_HOST, GRPC_PORT, true);
-        podA = mockPod(SELF_UID, LOCAL_HOST);
-        podB = mockPod(REMOTE_UID, REMOTE_HOST);
-    }
-
-    @Test
-    public void queryRemoteNodesWhenInformerNotwork() throws Exception {
-        withEnvironmentVariable(SELF_UID, SELF_UID + "0").execute(() -> {
-            providerA = createProvider(SELF_UID);
-            coordinatorA = getClusterCoordinator(providerA);
-            coordinatorA.start();
-        });
-
-        KubernetesCoordinator coordinator = getClusterCoordinator(providerA);
-        
doReturn(Optional.empty()).when(NamespacedPodListInformer.INFORMER).listPods();
-        List<RemoteInstance> remoteInstances = 
Whitebox.invokeMethod(coordinator, "queryRemoteNodes");
-        Assertions.assertEquals(1, remoteInstances.size());
-        Assertions.assertEquals(addressA, remoteInstances.get(0).getAddress());
-    }
-
-    @Test
-    public void queryRemoteNodesWhenInformerWork() throws Exception {
-        withEnvironmentVariable(SELF_UID + "0", SELF_UID + "0")
-                .execute(() -> {
-                    ModuleProvider provider = createProvider(SELF_UID + "0");
-                    KubernetesCoordinator coordinator = 
getClusterCoordinator(provider);
-                    coordinator.start();
-                    
doReturn(Optional.of(mockPodList())).when(NamespacedPodListInformer.INFORMER).listPods();
-                    List<RemoteInstance> remoteInstances = 
Whitebox.invokeMethod(coordinator, "queryRemoteNodes");
-                    Assertions.assertEquals(5, remoteInstances.size());
-                    List<RemoteInstance> self = remoteInstances.stream()
-                            .filter(item -> item.getAddress().isSelf())
-                            .collect(Collectors.toList());
-                    List<RemoteInstance> others = remoteInstances.stream()
-                            .filter(item -> !item.getAddress().isSelf())
-                            .collect(Collectors.toList());
-
-                    Assertions.assertEquals(1, self.size());
-                    Assertions.assertEquals(4, others.size());
-                });
-    }
-
-    @Test
-    public void registerRemote() throws Exception {
-        RemoteInstance instance = new RemoteInstance(addressA);
-        withEnvironmentVariable(SELF_UID, SELF_UID).execute(() -> {
-            providerA = createProvider(SELF_UID);
-            coordinatorA = getClusterCoordinator(providerA);
-            coordinatorA.start();
-        });
-        doReturn(Optional.of(Collections.singletonList(podA)))
-                .when(NamespacedPodListInformer.INFORMER)
-                .listPods();
-
-        ClusterMockWatcher watcher = new ClusterMockWatcher();
-        coordinatorA.registerWatcher(watcher);
-        coordinatorA.registerRemote(instance);
-        KubernetesCoordinator.K8sResourceEventHandler listener = 
coordinatorA.new K8sResourceEventHandler();
-        listener.onAdd(podA);
-
-        List<RemoteInstance> remoteInstances = watcher.getRemoteInstances();
-        assertEquals(1, remoteInstances.size());
-        assertEquals(1, coordinatorA.queryRemoteNodes().size());
-        Address queryAddress = remoteInstances.get(0).getAddress();
-        assertEquals(addressA, queryAddress);
-        assertTrue(queryAddress.isSelf());
-    }
-
-    @Test
-    public void registerRemoteOfReceiver() throws Exception {
-        withEnvironmentVariable(SELF_UID, SELF_UID + "0").execute(() -> {
-            providerA = createProvider(SELF_UID);
-            coordinatorA = getClusterCoordinator(providerA);
-            coordinatorA.start();
-        });
-        withEnvironmentVariable(REMOTE_UID, REMOTE_UID).execute(() -> {
-            ModuleProvider providerB = createProvider(REMOTE_UID);
-            coordinatorB = getClusterCoordinator(providerB);
-        });
-        coordinatorB.start();
-
-        ClusterMockWatcher watcherB = new ClusterMockWatcher();
-        coordinatorB.registerWatcher(watcherB);
-
-        doReturn(Optional.of(Collections.singletonList(podA)))
-                .when(NamespacedPodListInformer.INFORMER)
-                .listPods();
-        RemoteInstance instance = new RemoteInstance(addressA);
-        coordinatorA.registerRemote(instance);
-        KubernetesCoordinator.K8sResourceEventHandler listener = 
coordinatorB.new K8sResourceEventHandler();
-        listener.onAdd(podA);
-
-        // Receiver
-        List<RemoteInstance> remoteInstances = watcherB.getRemoteInstances();
-        assertEquals(1, remoteInstances.size());
-        assertEquals(1, coordinatorB.queryRemoteNodes().size());
-        Address queryAddress = remoteInstances.get(0).getAddress();
-        assertEquals(addressA, queryAddress);
-        assertFalse(queryAddress.isSelf());
-    }
-
-    @Test
-    public void registerRemoteOfCluster() throws Exception {
-        withEnvironmentVariable(SELF_UID, SELF_UID).execute(() -> {
-            providerA = createProvider(SELF_UID);
-            coordinatorA = getClusterCoordinator(providerA);
-            coordinatorA.start();
-        });
-        withEnvironmentVariable(REMOTE_UID, REMOTE_UID).execute(() -> {
-            ModuleProvider providerB = createProvider(REMOTE_UID);
-            coordinatorB = getClusterCoordinator(providerB);
-        });
-        coordinatorB.start();
-
-        ClusterMockWatcher watcherA = new ClusterMockWatcher();
-        coordinatorA.registerWatcher(watcherA);
-        ClusterMockWatcher watcherB = new ClusterMockWatcher();
-        coordinatorB.registerWatcher(watcherB);
-
-        doReturn(Optional.of(Arrays.asList(podA, podB)))
-                .when(NamespacedPodListInformer.INFORMER)
-                .listPods();
-        RemoteInstance instanceA = new RemoteInstance(addressA);
-        RemoteInstance instanceB = new RemoteInstance(addressB);
-        coordinatorA.registerRemote(instanceA);
-        coordinatorB.registerRemote(instanceB);
-
-        KubernetesCoordinator.K8sResourceEventHandler listenerA = 
coordinatorA.new K8sResourceEventHandler();
-        listenerA.onAdd(podA);
-        listenerA.onAdd(podB);
-        KubernetesCoordinator.K8sResourceEventHandler listenerB = 
coordinatorB.new K8sResourceEventHandler();
-        listenerB.onAdd(podA);
-        listenerB.onAdd(podB);
-
-        List<RemoteInstance> remoteInstancesOfA = 
watcherA.getRemoteInstances();
-        validateServiceInstance(addressA, addressB, remoteInstancesOfA);
-        assertEquals(2, coordinatorA.queryRemoteNodes().size());
-
-        List<RemoteInstance> remoteInstancesOfB = 
watcherB.getRemoteInstances();
-        validateServiceInstance(addressB, addressA, remoteInstancesOfB);
-        assertEquals(2, coordinatorB.queryRemoteNodes().size());
-    }
-
-    @Test
-    public void deregisterRemoteOfCluster() throws Exception {
-        withEnvironmentVariable(SELF_UID, SELF_UID).execute(() -> {
-            providerA = createProvider(SELF_UID);
-            coordinatorA = getClusterCoordinator(providerA);
-            coordinatorA.start();
-        });
-        withEnvironmentVariable(REMOTE_UID, REMOTE_UID).execute(() -> {
-            ModuleProvider providerB = createProvider(REMOTE_UID);
-            coordinatorB = getClusterCoordinator(providerB);
-        });
-        coordinatorB.start();
-
-        ClusterMockWatcher watcherA = new ClusterMockWatcher();
-        coordinatorA.registerWatcher(watcherA);
-
-        ClusterMockWatcher watcherB = new ClusterMockWatcher();
-        coordinatorB.registerWatcher(watcherB);
-
-        doReturn(Optional.of(Arrays.asList(podA, podB)))
-                .when(NamespacedPodListInformer.INFORMER)
-                .listPods();
-        RemoteInstance instanceA = new RemoteInstance(addressA);
-        RemoteInstance instanceB = new RemoteInstance(addressB);
-        coordinatorA.registerRemote(instanceA);
-        coordinatorB.registerRemote(instanceB);
-
-        KubernetesCoordinator.K8sResourceEventHandler listenerA = 
coordinatorA.new K8sResourceEventHandler();
-        listenerA.onAdd(podA);
-        listenerA.onAdd(podB);
-        KubernetesCoordinator.K8sResourceEventHandler listenerB = 
coordinatorB.new K8sResourceEventHandler();
-        listenerB.onAdd(podA);
-        listenerB.onAdd(podB);
-
-        List<RemoteInstance> remoteInstancesOfA = 
watcherA.getRemoteInstances();
-        validateServiceInstance(addressA, addressB, remoteInstancesOfA);
-        assertEquals(2, coordinatorA.queryRemoteNodes().size());
-
-        List<RemoteInstance> remoteInstancesOfB = 
watcherB.getRemoteInstances();
-        validateServiceInstance(addressB, addressA, remoteInstancesOfB);
-        assertEquals(2, coordinatorB.queryRemoteNodes().size());
-
-        // deregister A
-        listenerB.onDelete(podA, false);
-        doReturn(Optional.of(Collections.singletonList(podB)))
-                .when(NamespacedPodListInformer.INFORMER)
-                .listPods();
-        // only B
-        remoteInstancesOfB = watcherB.getRemoteInstances();
-        assertEquals(1, remoteInstancesOfB.size());
-        assertEquals(1, coordinatorB.queryRemoteNodes().size());
-
-        Address address = remoteInstancesOfB.get(0).getAddress();
-        assertEquals(addressB, address);
-        assertTrue(address.isSelf());
-    }
-
-    private ClusterModuleKubernetesProvider createProvider(String uidEnvName) {
-        ClusterModuleKubernetesProvider provider = new 
ClusterModuleKubernetesProvider();
-
-        ClusterModuleKubernetesConfig config = new 
ClusterModuleKubernetesConfig();
-        provider.newConfigCreator().onInitialized(config);
-        config.setNamespace("default");
-        config.setLabelSelector("app=oap");
-        config.setUidEnvName(uidEnvName);
-
-        provider.setManager(moduleManager);
-        provider.prepare();
-        provider.start();
-        provider.notifyAfterCompleted();
-        return provider;
-    }
-
-    private KubernetesCoordinator getClusterCoordinator(ModuleProvider 
provider) {
-        return (KubernetesCoordinator) 
provider.getService(ClusterCoordinator.class);
-    }
-
-    private Pod mockPod(String uid, String ip) {
-        Pod v1Pod = new Pod();
-        v1Pod.setMetadata(new ObjectMeta());
-        v1Pod.setStatus(new PodStatus());
-        v1Pod.getStatus().setPhase("Running");
-        v1Pod.getMetadata().setUid(uid);
-        v1Pod.getStatus().setPodIP(ip);
-        v1Pod.getStatus().setConditions(List.of(new PodCondition("", "", "", 
"", "True", "Ready")));
-        return v1Pod;
-    }
-
-    private List<Pod> mockPodList() {
-        List<Pod> pods = new ArrayList<>();
-        for (int i = 0; i < 5; i++) {
-            Pod v1Pod = new Pod();
-            v1Pod.setMetadata(new ObjectMeta());
-            v1Pod.setStatus(new PodStatus());
-            v1Pod.getMetadata().setUid(SELF_UID + i);
-            v1Pod.getStatus().setPodIP(LOCAL_HOST);
-            v1Pod.getStatus().setConditions(List.of(new PodCondition("", "", 
"", "", "True", "Ready")));
-            pods.add(v1Pod);
-        }
-        return pods;
-    }
-
-    private void validateServiceInstance(Address selfAddress, Address 
otherAddress,
-                                         List<RemoteInstance> queryResult) {
-        assertEquals(2, queryResult.size());
-
-        boolean selfExist = false, otherExist = false;
-
-        for (RemoteInstance instance : queryResult) {
-            Address queryAddress = instance.getAddress();
-            if (queryAddress.equals(selfAddress) && queryAddress.isSelf()) {
-                selfExist = true;
-            } else if (queryAddress.equals(otherAddress) && 
!queryAddress.isSelf()) {
-                otherExist = true;
-            }
-        }
-
-        assertTrue(selfExist);
-        assertTrue(otherExist);
-    }
-
-    static class ClusterMockWatcher implements ClusterWatcher {
-        @Getter
-        private List<RemoteInstance> remoteInstances = new ArrayList<>();
-
-        @Override
-        public void onClusterNodesChanged(final List<RemoteInstance> 
remoteInstances) {
-            this.remoteInstances = remoteInstances;
-        }
-    }
-}


Reply via email to