This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch k8s-Coordinator-role in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/k8s-Coordinator-role by this push: new a84cb5b Do lock when port has been intialized. a84cb5b is described below commit a84cb5b187e0ef50558a1b529905861cec95270c Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Mon Mar 11 21:00:44 2019 +0800 Do lock when port has been intialized. --- .../plugin/kubernetes/KubernetesCoordinator.java | 31 +++++++++++++++++----- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java index 14e0f91..c91bd9a 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.*; import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.skywalking.oap.server.core.CoreModule; @@ -50,6 +51,8 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery private final ReusableWatch<Event> watch; + private final ReentrantLock portSetLock; + private volatile int port = -1; KubernetesCoordinator(ModuleManager manager, @@ -58,6 +61,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery this.watch = watch; this.uid = uidSupplier.get(); TelemetryRelatedContext.INSTANCE.setId(uid); + this.portSetLock = new ReentrantLock(); } public void start() { @@ -100,7 +104,16 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery switch (event.getType()) { case "ADDED": case "MODIFIED": - cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid)))); + if (port == -1) { + portSetLock.lock(); + try { + cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid)))); + } finally { + portSetLock.unlock(); + } + } else { + cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid)))); + } break; case "DELETED": cache.remove(event.getUid()); @@ -113,11 +126,17 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery @Override public List<RemoteInstance> queryRemoteNodes() { if (port == -1) { - logger.debug("Query kubernetes remote, port hasn't init, try to init"); - gRPCConfigService service = manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class); - port = service.getPort(); - logger.debug("Query kubernetes remote, port is set at {}", port); - cache.values().forEach(instance -> instance.getAddress().setPort(port)); + // Use lock mechanism to avoid concurrency conflict with `generateRemoteNodes`. + portSetLock.lock(); + try { + logger.debug("Query kubernetes remote, port hasn't init, try to init"); + gRPCConfigService service = manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class); + port = service.getPort(); + logger.debug("Query kubernetes remote, port is set at {}", port); + cache.values().forEach(instance -> instance.getAddress().setPort(port)); + } finally { + portSetLock.unlock(); + } } logger.debug("Query kubernetes remote nodes: {}", cache); return Lists.newArrayList(cache.values());