This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 3f7c1a3acd refactor: Kubernetes coordinator to be more accurate about
nodes readiness (#13493)
3f7c1a3acd is described below
commit 3f7c1a3acd320130cab0fb69d087676d3b733688
Author: kezhenxu94 <[email protected]>
AuthorDate: Mon Sep 15 14:27:28 2025 +0800
refactor: Kubernetes coordinator to be more accurate about nodes readiness
(#13493)
---
docs/en/changes/changes.md | 1 +
.../plugin/kubernetes/KubernetesCoordinator.java | 190 ++++-------
.../KubernetesLabelSelectorEndpointGroup.java | 215 ++++++++++++
.../kubernetes/NamespacedPodListInformer.java | 74 ----
.../ClusterModuleKubernetesProviderTest.java | 4 +-
.../kubernetes/KubernetesCoordinatorTest.java | 377 ---------------------
6 files changed, 288 insertions(+), 573 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 92a707c947..482abae02c 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -96,6 +96,7 @@
* Fix metrics comparison in promql with bool modifier.
* Add rate limiter for Zipkin trace receiver to limit maximum spans per second.
* Open `health-checker` module by default due to latest UI changes. Change the
default check period to 30s.
+* Refactor Kubernetes coordinator to be more accurate about node readiness.
#### UI
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 97abc20da9..f06ee72818 100644
---
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -18,15 +18,10 @@
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodStatus;
-import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
-import org.apache.skywalking.oap.server.core.cluster.ClusterHealthStatus;
-import org.apache.skywalking.oap.server.core.cluster.OAPNodeChecker;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.cluster.ServiceQueryException;
import org.apache.skywalking.oap.server.core.cluster.ServiceRegisterException;
@@ -39,66 +34,79 @@ import
org.apache.skywalking.oap.server.telemetry.api.HealthCheckMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
-import java.util.ArrayList;
-import java.util.Collections;
+import com.google.common.base.Strings;
+import com.linecorp.armeria.client.endpoint.EndpointGroup;
+
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
-import static
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.ADDED;
-import static
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.DELETED;
-import static
org.apache.skywalking.oap.server.cluster.plugin.kubernetes.EventType.MODIFIED;
-
/**
- * Read collector pod info from api-server of kubernetes, then using all
containerIp list to construct the list of
+ * Read collector pod info from Kubernetes using
KubernetesLabelSelectorEndpointGroup, then construct the list of
* {@link RemoteInstance}.
*/
@Slf4j
public class KubernetesCoordinator extends ClusterCoordinator {
private final ModuleDefineHolder manager;
- private final String uid;
private volatile int port = -1;
private HealthCheckMetrics healthChecker;
private ClusterModuleKubernetesConfig config;
- private final Map<String, RemoteInstance> remoteInstanceMap;
- private volatile List<String> latestInstances;
+
+ private EndpointGroup endpointGroup;
+ private volatile List<RemoteInstance> remoteInstances;
public KubernetesCoordinator(final ModuleDefineHolder manager,
final ClusterModuleKubernetesConfig config) {
- this.uid = new UidEnvSupplier(config.getUidEnvName()).get();
this.manager = manager;
this.config = config;
- this.remoteInstanceMap = new ConcurrentHashMap<>(20);
- this.latestInstances = new ArrayList<>(20);
+
+ if (Strings.isNullOrEmpty(config.getLabelSelector())) {
+ throw new IllegalArgumentException("kubernetes labelSelector must
be provided");
+ }
+ }
+
+ private EndpointGroup createEndpointGroup() {
+ if (port == -1) {
+ port =
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
+ }
+ final var kubernetesClient = new KubernetesClientBuilder().build();
+ final var builder =
KubernetesLabelSelectorEndpointGroup.builder(kubernetesClient);
+
+ if (StringUtil.isNotBlank(config.getNamespace())) {
+ builder.namespace(config.getNamespace());
+ }
+
+ final var labelMap = parseLabelSelector(config.getLabelSelector());
+ builder.labelSelector(labelMap);
+
+ builder.port(port);
+ builder.selfUid(new UidEnvSupplier(config.getUidEnvName()).get());
+
+ return builder.build();
+ }
+
+ private Map<String, String> parseLabelSelector(String labelSelector) {
+ final var labels = new HashMap<String, String>();
+ if (StringUtil.isBlank(labelSelector)) {
+ return labels;
+ }
+
+ final var pairs = labelSelector.split(",");
+ for (final var pair : pairs) {
+ final var keyValue = pair.trim().split("=", 2);
+ if (keyValue.length == 2) {
+ labels.put(keyValue[0].trim(), keyValue[1].trim());
+ }
+ }
+ return labels;
}
@Override
public List<RemoteInstance> queryRemoteNodes() {
try {
- List<Pod> pods =
NamespacedPodListInformer.INFORMER.listPods().orElseGet(this::selfPod);
- if (log.isDebugEnabled()) {
- List<String> uidList = pods
- .stream()
- .map(item -> item.getMetadata().getUid())
- .collect(Collectors.toList());
- log.debug("[kubernetes cluster pods uid list]:{}", uidList);
- }
- if (port == -1) {
- port =
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
- }
- List<RemoteInstance> remoteInstances =
- pods.stream()
- .filter(pod ->
StringUtil.isNotBlank(pod.getStatus().getPodIP()))
- .map(pod -> new RemoteInstance(
- new Address(pod.getStatus().getPodIP(), port,
pod.getMetadata().getUid().equals(uid))))
- .collect(Collectors.toList());
healthChecker.health();
- this.latestInstances = remoteInstances.stream().map(it ->
it.getAddress().toString()).collect(Collectors.toList());
- if (log.isDebugEnabled()) {
- remoteInstances.forEach(instance -> log.debug("kubernetes
cluster instance: {}", instance));
- }
return remoteInstances;
} catch (Throwable e) {
healthChecker.unHealth(e);
@@ -127,93 +135,33 @@ public class KubernetesCoordinator extends
ClusterCoordinator {
}
}
- private List<Pod> selfPod() {
- Pod v1Pod = new Pod();
- v1Pod.setMetadata(new ObjectMeta());
- v1Pod.setStatus(new PodStatus());
- v1Pod.getMetadata().setUid(uid);
- v1Pod.getStatus().setPodIP("127.0.0.1");
- return Collections.singletonList(v1Pod);
- }
-
@Override
public void start() {
- initHealthChecker();
- NamespacedPodListInformer.INFORMER.init(config, new
K8sResourceEventHandler());
- }
+ endpointGroup = createEndpointGroup();
+ endpointGroup.addListener(endpoints -> {
+ if (port == -1) {
+ port =
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
+ }
- class K8sResourceEventHandler implements ResourceEventHandler<Pod> {
+ if (log.isDebugEnabled()) {
+ log.debug("[kubernetes cluster endpoints]: {}", endpoints);
+ }
- @Override
- public void onAdd(final Pod obj) {
- updateRemoteInstances(obj, ADDED);
- }
+ final var instances = endpoints.stream()
+ .map(endpoint -> new RemoteInstance(new
Address(endpoint.host(), endpoint.port(), false)))
+ .collect(Collectors.toList());
- @Override
- public void onUpdate(final Pod oldObj, final Pod newObj) {
- updateRemoteInstances(newObj, MODIFIED);
- }
+ // The endpoint group will never include itself, add it.
+ final var selfInstance = new RemoteInstance(new
Address("127.0.0.1", port, true));
+ instances.add(selfInstance);
- @Override
- public void onDelete(final Pod obj, final boolean
deletedFinalStateUnknown) {
- updateRemoteInstances(obj, DELETED);
- }
- }
-
- /**
- * When a remote instance up/off line, will receive multi event according
to the pod status.
- * To avoid notify the watchers too frequency, here use a
`remoteInstanceMap` to cache them.
- * Only notify watchers once when the instances changed.
- */
- private void updateRemoteInstances(Pod pod, EventType event) {
- try {
- initHealthChecker();
- if (StringUtil.isNotBlank(pod.getStatus().getPodIP())) {
- if (port == -1) {
- port =
manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
- }
-
- RemoteInstance remoteInstance = new RemoteInstance(
- new Address(pod.getStatus().getPodIP(), this.port,
pod.getMetadata().getUid().equals(uid)));
- switch (event) {
- case ADDED:
- case MODIFIED:
- if
("Running".equalsIgnoreCase(pod.getStatus().getPhase())) {
-
remoteInstanceMap.put(remoteInstance.getAddress().toString(), remoteInstance);
- } else if
("Terminating".equalsIgnoreCase(pod.getStatus().getPhase())) {
-
remoteInstanceMap.remove(remoteInstance.getAddress().toString());
- }
- break;
- case DELETED:
-
this.remoteInstanceMap.remove(remoteInstance.getAddress().toString());
- break;
- default:
- return;
- }
- updateRemoteInstances();
+ if (log.isDebugEnabled()) {
+ instances.forEach(instance -> log.debug("kubernetes cluster
instance: {}", instance));
}
- } catch (Throwable e) {
- healthChecker.unHealth(e);
- log.error("Failed to notify RemoteInstances update.", e);
- }
- }
-
- private void updateRemoteInstances() {
- List<String> updatedInstances = new
ArrayList<>(this.remoteInstanceMap.keySet());
- if (this.latestInstances.size() != updatedInstances.size() ||
!this.latestInstances.containsAll(updatedInstances)) {
- List<RemoteInstance> remoteInstances = new
ArrayList<>(this.remoteInstanceMap.values());
- this.latestInstances = updatedInstances;
- checkHealth(remoteInstances);
- notifyWatchers(remoteInstances);
- }
- }
- private void checkHealth(List<RemoteInstance> remoteInstances) {
- ClusterHealthStatus healthStatus =
OAPNodeChecker.isHealth(remoteInstances);
- if (healthStatus.isHealth()) {
- this.healthChecker.health();
- } else {
- this.healthChecker.unHealth(healthStatus.getReason());
- }
+ this.remoteInstances = instances;
+ notifyWatchers(instances);
+ }, true);
+ initHealthChecker();
}
}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
new file mode 100644
index 0000000000..36fa6d6aae
--- /dev/null
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesLabelSelectorEndpointGroup.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
+
+import com.linecorp.armeria.client.Endpoint;
+import com.linecorp.armeria.client.endpoint.DynamicEndpointGroup;
+import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;
+import com.linecorp.armeria.internal.shaded.guava.base.Strings;
+
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
+import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
+import io.fabric8.kubernetes.client.informers.cache.Lister;
+import lombok.Data;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class KubernetesLabelSelectorEndpointGroup extends DynamicEndpointGroup
{
+
+ private final KubernetesClient kubernetesClient;
+ private final String namespace;
+ private final Map<String, String> labelSelector;
+ private final int port;
+ private final String portName;
+ private final SharedIndexInformer<Pod> podInformer;
+ private final String selfUid;
+
+ private KubernetesLabelSelectorEndpointGroup(Builder builder) {
+ super(builder.selectionStrategy);
+ this.kubernetesClient = builder.kubernetesClient;
+ this.namespace = builder.namespace;
+ this.labelSelector = builder.labelSelector;
+ this.port = builder.port;
+ this.portName = builder.portName;
+ this.selfUid = Strings.nullToEmpty(builder.selfUid);
+
+ this.podInformer = kubernetesClient.pods()
+ .inNamespace(namespace)
+ .withLabels(labelSelector)
+ .inform(new PodEventHandler());
+
+ updateEndpoints();
+ }
+
+ public static Builder builder(KubernetesClient kubernetesClient) {
+ return new Builder(kubernetesClient);
+ }
+
+ @Override
+ protected void doCloseAsync(CompletableFuture<?> future) {
+ if (podInformer != null) {
+ try {
+ podInformer.close();
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ } else {
+ future.complete(null);
+ }
+ }
+
+ private void updateEndpoints() {
+ try {
+ if (podInformer == null) {
+ log.warn("Pod informer is not initialized yet.");
+ return;
+ }
+ final var podLister = new Lister<>(podInformer.getIndexer());
+ final var pods = podLister.namespace(namespace).list();
+
+ final var newEndpoints = pods.stream()
+ .filter(this::isPodReady)
+ .filter(pod ->
StringUtil.isNotBlank(pod.getStatus().getPodIP()))
+ .filter(pod -> !pod.getMetadata().getUid().equals(selfUid))
+ .map(this::createEndpoint)
+ .filter(endpoint -> endpoint != null)
+ .collect(Collectors.toList());
+
+ log.debug("Updating endpoints to: {}", newEndpoints);
+ setEndpoints(newEndpoints);
+ } catch (Exception e) {
+ log.error("Failed to update endpoints", e);
+ }
+ }
+
+ private boolean isPodReady(Pod pod) {
+ final var podStatus = pod.getStatus();
+ if (podStatus == null) {
+ return false;
+ }
+ if (!"Running".equalsIgnoreCase(podStatus.getPhase())) {
+ return false;
+ }
+ if (podStatus.getContainerStatuses() == null ||
podStatus.getContainerStatuses().isEmpty()) {
+ return false;
+ }
+ if (podStatus.getConditions() == null ||
podStatus.getConditions().isEmpty()) {
+ return false;
+ }
+
+ final var allContainersReady =
+ podStatus.getContainerStatuses()
+ .stream().allMatch(containerStatus ->
containerStatus.getReady() != Boolean.FALSE);
+ final var podReadyCondition =
+ podStatus.getConditions()
+ .stream()
+ .anyMatch(condition ->
"Ready".equalsIgnoreCase(condition.getType())
+ && condition.getStatus() != null
+ && condition.getStatus().equalsIgnoreCase("True"));
+ return allContainersReady && podReadyCondition;
+ }
+
+ private Endpoint createEndpoint(Pod pod) {
+ final var podIP = pod.getStatus().getPodIP();
+ if (StringUtil.isBlank(podIP)) {
+ return null;
+ }
+
+ final var targetPort = determineTargetPort(pod);
+ if (targetPort <= 0) {
+ log.warn("Could not determine target port for pod: {}",
pod.getMetadata().getName());
+ return null;
+ }
+
+ return Endpoint.of(podIP, targetPort);
+ }
+
+ private int determineTargetPort(Pod pod) {
+ if (port > 0) {
+ return port;
+ }
+
+ if (StringUtil.isNotBlank(portName) && pod.getSpec().getContainers()
!= null) {
+ return pod.getSpec().getContainers().stream()
+ .flatMap(container -> container.getPorts() != null ?
container.getPorts().stream() : null)
+ .filter(containerPort ->
portName.equals(containerPort.getName()))
+ .mapToInt(containerPort ->
containerPort.getContainerPort())
+ .findFirst()
+ .orElse(-1);
+ }
+
+ return -1;
+ }
+
+ private class PodEventHandler implements ResourceEventHandler<Pod> {
+ @Override
+ public void onAdd(Pod pod) {
+ log.debug("Pod added: {}", pod.getMetadata().getName());
+ updateEndpoints();
+ }
+
+ @Override
+ public void onUpdate(Pod oldPod, Pod newPod) {
+ log.debug("Pod updated: {}, {}", newPod.getMetadata().getName(),
newPod.getStatus());
+ updateEndpoints();
+ }
+
+ @Override
+ public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
+ log.debug("Pod deleted: {}", pod.getMetadata().getName());
+ updateEndpoints();
+ }
+ }
+
+ @Data
+ @Accessors(fluent = true)
+ public static class Builder {
+ private final KubernetesClient kubernetesClient;
+ private String namespace = "default";
+ private Map<String, String> labelSelector = new ConcurrentHashMap<>();
+ private int port = -1;
+ private String portName;
+ private EndpointSelectionStrategy selectionStrategy =
EndpointSelectionStrategy.weightedRoundRobin();
+ private String selfUid;
+
+ private Builder(KubernetesClient kubernetesClient) {
+ this.kubernetesClient = kubernetesClient;
+ }
+
+ public KubernetesLabelSelectorEndpointGroup build() {
+ if (port <= 0 && StringUtil.isBlank(portName)) {
+ throw new IllegalArgumentException("Either port or portName
must be specified");
+ }
+ if (labelSelector.isEmpty()) {
+ throw new IllegalArgumentException("Label selector must not be
empty");
+ }
+ return new KubernetesLabelSelectorEndpointGroup(this);
+ }
+ }
+}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/NamespacedPodListInformer.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/NamespacedPodListInformer.java
deleted file mode 100644
index ac5e9d8f6a..0000000000
---
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/NamespacedPodListInformer.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
-
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.client.KubernetesClientBuilder;
-import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
-import io.fabric8.kubernetes.client.informers.cache.Lister;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static java.util.Objects.isNull;
-
-@Slf4j
-public enum NamespacedPodListInformer {
-
- /**
- * contains remote collector instances
- */
- INFORMER;
-
- private Lister<Pod> podLister;
-
- public synchronized void init(ClusterModuleKubernetesConfig podConfig,
ResourceEventHandler<Pod> eventHandler) {
- try {
- final var kubernetesClient = new KubernetesClientBuilder().build();
- final var informer = kubernetesClient
- .pods()
- .inNamespace(podConfig.getNamespace())
- .withLabelSelector(podConfig.getLabelSelector())
- .inform(eventHandler);
-
- podLister = new Lister<>(informer.getIndexer());
-
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- informer.close();
- kubernetesClient.close();
- }));
- } catch (Exception e) {
- log.error("cannot connect with api server in kubernetes", e);
- }
- }
-
- public Optional<List<Pod>> listPods() {
- if (isNull(podLister)) {
- return Optional.empty();
- }
- return Optional.ofNullable(podLister.list().size() != 0
- ? podLister.list()
- .stream()
- .filter(pod ->
"Running".equalsIgnoreCase(pod.getStatus().getPhase()))
- .collect(Collectors.toList())
- : null);
- }
-}
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
index 1f146bed23..11d5da56b1 100644
---
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
+++
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProviderTest.java
@@ -46,10 +46,12 @@ public class ClusterModuleKubernetesProviderTest {
@BeforeEach
public void before() {
+ final var config = new ClusterModuleKubernetesConfig();
+ config.setLabelSelector("app=oap");
TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
Whitebox.setInternalState(telemetryModule, "loadedProvider",
telemetryProvider);
provider.setManager(moduleManager);
- Whitebox.setInternalState(provider, "config", new
ClusterModuleKubernetesConfig());
+ Whitebox.setInternalState(provider, "config", config);
}
@Test
diff --git
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
deleted file mode 100644
index 4e35e9e134..0000000000
---
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ /dev/null
@@ -1,377 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
-
-import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodCondition;
-import io.fabric8.kubernetes.api.model.PodStatus;
-import lombok.Getter;
-import org.apache.skywalking.oap.server.core.CoreModule;
-import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
-import org.apache.skywalking.oap.server.core.cluster.ClusterWatcher;
-import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
-import org.apache.skywalking.oap.server.core.config.ConfigService;
-import org.apache.skywalking.oap.server.core.remote.client.Address;
-import org.apache.skywalking.oap.server.library.module.ModuleManager;
-import org.apache.skywalking.oap.server.library.module.ModuleProvider;
-import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
-import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
-import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
-import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
-import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
-import org.apache.skywalking.oap.server.telemetry.none.NoneTelemetryProvider;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.jupiter.MockitoExtension;
-import org.mockito.junit.jupiter.MockitoSettings;
-import org.mockito.quality.Strictness;
-import org.powermock.reflect.Whitebox;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static
uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariable;
-
-@ExtendWith(MockitoExtension.class)
-@MockitoSettings(strictness = Strictness.LENIENT)
-public class KubernetesCoordinatorTest {
- public static final String LOCAL_HOST = "127.0.0.1";
- public static final String REMOTE_HOST = "127.0.0.2";
- public static final Integer GRPC_PORT = 11800;
- public static final String SELF_UID = "self";
- public static final String REMOTE_UID = "remote";
-
- @Mock
- private ModuleManager moduleManager;
- @Mock
- private NoneTelemetryProvider telemetryProvider;
- private ModuleProvider providerA;
- private Address addressA;
- private Address addressB;
- private KubernetesCoordinator coordinatorA;
- private KubernetesCoordinator coordinatorB;
- private Pod podA;
- private Pod podB;
-
- @BeforeEach
- public void prepare() {
- Mockito.when(telemetryProvider.getService(MetricsCreator.class))
- .thenReturn(new MetricsCreatorNoop());
- TelemetryModule telemetryModule = Mockito.spy(TelemetryModule.class);
- Whitebox.setInternalState(telemetryModule, "loadedProvider",
telemetryProvider);
- NamespacedPodListInformer informer =
mock(NamespacedPodListInformer.class);
- Whitebox.setInternalState(NamespacedPodListInformer.class, "INFORMER",
informer);
-
when(moduleManager.find(TelemetryModule.NAME)).thenReturn(telemetryModule);
-
when(moduleManager.find(CoreModule.NAME)).thenReturn(mock(ModuleProviderHolder.class));
-
when(moduleManager.find(CoreModule.NAME).provider()).thenReturn(mock(ModuleServiceHolder.class));
-
when(moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class)).thenReturn(
- mock(ConfigService.class));
-
when(moduleManager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort()).thenReturn(
- GRPC_PORT);
-
- addressA = new Address(LOCAL_HOST, GRPC_PORT, true);
- addressB = new Address(REMOTE_HOST, GRPC_PORT, true);
- podA = mockPod(SELF_UID, LOCAL_HOST);
- podB = mockPod(REMOTE_UID, REMOTE_HOST);
- }
-
- @Test
- public void queryRemoteNodesWhenInformerNotwork() throws Exception {
- withEnvironmentVariable(SELF_UID, SELF_UID + "0").execute(() -> {
- providerA = createProvider(SELF_UID);
- coordinatorA = getClusterCoordinator(providerA);
- coordinatorA.start();
- });
-
- KubernetesCoordinator coordinator = getClusterCoordinator(providerA);
-
doReturn(Optional.empty()).when(NamespacedPodListInformer.INFORMER).listPods();
- List<RemoteInstance> remoteInstances =
Whitebox.invokeMethod(coordinator, "queryRemoteNodes");
- Assertions.assertEquals(1, remoteInstances.size());
- Assertions.assertEquals(addressA, remoteInstances.get(0).getAddress());
- }
-
- @Test
- public void queryRemoteNodesWhenInformerWork() throws Exception {
- withEnvironmentVariable(SELF_UID + "0", SELF_UID + "0")
- .execute(() -> {
- ModuleProvider provider = createProvider(SELF_UID + "0");
- KubernetesCoordinator coordinator =
getClusterCoordinator(provider);
- coordinator.start();
-
doReturn(Optional.of(mockPodList())).when(NamespacedPodListInformer.INFORMER).listPods();
- List<RemoteInstance> remoteInstances =
Whitebox.invokeMethod(coordinator, "queryRemoteNodes");
- Assertions.assertEquals(5, remoteInstances.size());
- List<RemoteInstance> self = remoteInstances.stream()
- .filter(item -> item.getAddress().isSelf())
- .collect(Collectors.toList());
- List<RemoteInstance> others = remoteInstances.stream()
- .filter(item -> !item.getAddress().isSelf())
- .collect(Collectors.toList());
-
- Assertions.assertEquals(1, self.size());
- Assertions.assertEquals(4, others.size());
- });
- }
-
- @Test
- public void registerRemote() throws Exception {
- RemoteInstance instance = new RemoteInstance(addressA);
- withEnvironmentVariable(SELF_UID, SELF_UID).execute(() -> {
- providerA = createProvider(SELF_UID);
- coordinatorA = getClusterCoordinator(providerA);
- coordinatorA.start();
- });
- doReturn(Optional.of(Collections.singletonList(podA)))
- .when(NamespacedPodListInformer.INFORMER)
- .listPods();
-
- ClusterMockWatcher watcher = new ClusterMockWatcher();
- coordinatorA.registerWatcher(watcher);
- coordinatorA.registerRemote(instance);
- KubernetesCoordinator.K8sResourceEventHandler listener =
coordinatorA.new K8sResourceEventHandler();
- listener.onAdd(podA);
-
- List<RemoteInstance> remoteInstances = watcher.getRemoteInstances();
- assertEquals(1, remoteInstances.size());
- assertEquals(1, coordinatorA.queryRemoteNodes().size());
- Address queryAddress = remoteInstances.get(0).getAddress();
- assertEquals(addressA, queryAddress);
- assertTrue(queryAddress.isSelf());
- }
-
- @Test
- public void registerRemoteOfReceiver() throws Exception {
- withEnvironmentVariable(SELF_UID, SELF_UID + "0").execute(() -> {
- providerA = createProvider(SELF_UID);
- coordinatorA = getClusterCoordinator(providerA);
- coordinatorA.start();
- });
- withEnvironmentVariable(REMOTE_UID, REMOTE_UID).execute(() -> {
- ModuleProvider providerB = createProvider(REMOTE_UID);
- coordinatorB = getClusterCoordinator(providerB);
- });
- coordinatorB.start();
-
- ClusterMockWatcher watcherB = new ClusterMockWatcher();
- coordinatorB.registerWatcher(watcherB);
-
- doReturn(Optional.of(Collections.singletonList(podA)))
- .when(NamespacedPodListInformer.INFORMER)
- .listPods();
- RemoteInstance instance = new RemoteInstance(addressA);
- coordinatorA.registerRemote(instance);
- KubernetesCoordinator.K8sResourceEventHandler listener =
coordinatorB.new K8sResourceEventHandler();
- listener.onAdd(podA);
-
- // Receiver
- List<RemoteInstance> remoteInstances = watcherB.getRemoteInstances();
- assertEquals(1, remoteInstances.size());
- assertEquals(1, coordinatorB.queryRemoteNodes().size());
- Address queryAddress = remoteInstances.get(0).getAddress();
- assertEquals(addressA, queryAddress);
- assertFalse(queryAddress.isSelf());
- }
-
- @Test
- public void registerRemoteOfCluster() throws Exception {
- withEnvironmentVariable(SELF_UID, SELF_UID).execute(() -> {
- providerA = createProvider(SELF_UID);
- coordinatorA = getClusterCoordinator(providerA);
- coordinatorA.start();
- });
- withEnvironmentVariable(REMOTE_UID, REMOTE_UID).execute(() -> {
- ModuleProvider providerB = createProvider(REMOTE_UID);
- coordinatorB = getClusterCoordinator(providerB);
- });
- coordinatorB.start();
-
- ClusterMockWatcher watcherA = new ClusterMockWatcher();
- coordinatorA.registerWatcher(watcherA);
- ClusterMockWatcher watcherB = new ClusterMockWatcher();
- coordinatorB.registerWatcher(watcherB);
-
- doReturn(Optional.of(Arrays.asList(podA, podB)))
- .when(NamespacedPodListInformer.INFORMER)
- .listPods();
- RemoteInstance instanceA = new RemoteInstance(addressA);
- RemoteInstance instanceB = new RemoteInstance(addressB);
- coordinatorA.registerRemote(instanceA);
- coordinatorB.registerRemote(instanceB);
-
- KubernetesCoordinator.K8sResourceEventHandler listenerA =
coordinatorA.new K8sResourceEventHandler();
- listenerA.onAdd(podA);
- listenerA.onAdd(podB);
- KubernetesCoordinator.K8sResourceEventHandler listenerB =
coordinatorB.new K8sResourceEventHandler();
- listenerB.onAdd(podA);
- listenerB.onAdd(podB);
-
- List<RemoteInstance> remoteInstancesOfA =
watcherA.getRemoteInstances();
- validateServiceInstance(addressA, addressB, remoteInstancesOfA);
- assertEquals(2, coordinatorA.queryRemoteNodes().size());
-
- List<RemoteInstance> remoteInstancesOfB =
watcherB.getRemoteInstances();
- validateServiceInstance(addressB, addressA, remoteInstancesOfB);
- assertEquals(2, coordinatorB.queryRemoteNodes().size());
- }
-
- @Test
- public void deregisterRemoteOfCluster() throws Exception {
- withEnvironmentVariable(SELF_UID, SELF_UID).execute(() -> {
- providerA = createProvider(SELF_UID);
- coordinatorA = getClusterCoordinator(providerA);
- coordinatorA.start();
- });
- withEnvironmentVariable(REMOTE_UID, REMOTE_UID).execute(() -> {
- ModuleProvider providerB = createProvider(REMOTE_UID);
- coordinatorB = getClusterCoordinator(providerB);
- });
- coordinatorB.start();
-
- ClusterMockWatcher watcherA = new ClusterMockWatcher();
- coordinatorA.registerWatcher(watcherA);
-
- ClusterMockWatcher watcherB = new ClusterMockWatcher();
- coordinatorB.registerWatcher(watcherB);
-
- doReturn(Optional.of(Arrays.asList(podA, podB)))
- .when(NamespacedPodListInformer.INFORMER)
- .listPods();
- RemoteInstance instanceA = new RemoteInstance(addressA);
- RemoteInstance instanceB = new RemoteInstance(addressB);
- coordinatorA.registerRemote(instanceA);
- coordinatorB.registerRemote(instanceB);
-
- KubernetesCoordinator.K8sResourceEventHandler listenerA =
coordinatorA.new K8sResourceEventHandler();
- listenerA.onAdd(podA);
- listenerA.onAdd(podB);
- KubernetesCoordinator.K8sResourceEventHandler listenerB =
coordinatorB.new K8sResourceEventHandler();
- listenerB.onAdd(podA);
- listenerB.onAdd(podB);
-
- List<RemoteInstance> remoteInstancesOfA =
watcherA.getRemoteInstances();
- validateServiceInstance(addressA, addressB, remoteInstancesOfA);
- assertEquals(2, coordinatorA.queryRemoteNodes().size());
-
- List<RemoteInstance> remoteInstancesOfB =
watcherB.getRemoteInstances();
- validateServiceInstance(addressB, addressA, remoteInstancesOfB);
- assertEquals(2, coordinatorB.queryRemoteNodes().size());
-
- // deregister A
- listenerB.onDelete(podA, false);
- doReturn(Optional.of(Collections.singletonList(podB)))
- .when(NamespacedPodListInformer.INFORMER)
- .listPods();
- // only B
- remoteInstancesOfB = watcherB.getRemoteInstances();
- assertEquals(1, remoteInstancesOfB.size());
- assertEquals(1, coordinatorB.queryRemoteNodes().size());
-
- Address address = remoteInstancesOfB.get(0).getAddress();
- assertEquals(addressB, address);
- assertTrue(address.isSelf());
- }
-
- private ClusterModuleKubernetesProvider createProvider(String uidEnvName) {
- ClusterModuleKubernetesProvider provider = new
ClusterModuleKubernetesProvider();
-
- ClusterModuleKubernetesConfig config = new
ClusterModuleKubernetesConfig();
- provider.newConfigCreator().onInitialized(config);
- config.setNamespace("default");
- config.setLabelSelector("app=oap");
- config.setUidEnvName(uidEnvName);
-
- provider.setManager(moduleManager);
- provider.prepare();
- provider.start();
- provider.notifyAfterCompleted();
- return provider;
- }
-
- private KubernetesCoordinator getClusterCoordinator(ModuleProvider
provider) {
- return (KubernetesCoordinator)
provider.getService(ClusterCoordinator.class);
- }
-
- private Pod mockPod(String uid, String ip) {
- Pod v1Pod = new Pod();
- v1Pod.setMetadata(new ObjectMeta());
- v1Pod.setStatus(new PodStatus());
- v1Pod.getStatus().setPhase("Running");
- v1Pod.getMetadata().setUid(uid);
- v1Pod.getStatus().setPodIP(ip);
- v1Pod.getStatus().setConditions(List.of(new PodCondition("", "", "",
"", "True", "Ready")));
- return v1Pod;
- }
-
- private List<Pod> mockPodList() {
- List<Pod> pods = new ArrayList<>();
- for (int i = 0; i < 5; i++) {
- Pod v1Pod = new Pod();
- v1Pod.setMetadata(new ObjectMeta());
- v1Pod.setStatus(new PodStatus());
- v1Pod.getMetadata().setUid(SELF_UID + i);
- v1Pod.getStatus().setPodIP(LOCAL_HOST);
- v1Pod.getStatus().setConditions(List.of(new PodCondition("", "",
"", "", "True", "Ready")));
- pods.add(v1Pod);
- }
- return pods;
- }
-
- private void validateServiceInstance(Address selfAddress, Address
otherAddress,
- List<RemoteInstance> queryResult) {
- assertEquals(2, queryResult.size());
-
- boolean selfExist = false, otherExist = false;
-
- for (RemoteInstance instance : queryResult) {
- Address queryAddress = instance.getAddress();
- if (queryAddress.equals(selfAddress) && queryAddress.isSelf()) {
- selfExist = true;
- } else if (queryAddress.equals(otherAddress) &&
!queryAddress.isSelf()) {
- otherExist = true;
- }
- }
-
- assertTrue(selfExist);
- assertTrue(otherExist);
- }
-
- static class ClusterMockWatcher implements ClusterWatcher {
- @Getter
- private List<RemoteInstance> remoteInstances = new ArrayList<>();
-
- @Override
- public void onClusterNodesChanged(final List<RemoteInstance>
remoteInstances) {
- this.remoteInstances = remoteInstances;
- }
- }
-}