This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch k8s-Coordinator-role in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/k8s-Coordinator-role by this push: new 11dd661 Implement in an easier way. 11dd661 is described below commit 11dd661e645b267e6745301fc6afeefa7fe608be Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Mon Mar 11 21:44:07 2019 +0800 Implement in an easier way. --- .../plugin/kubernetes/KubernetesCoordinator.java | 35 +++++++--------------- .../oap/server/core/remote/client/Address.java | 2 +- 2 files changed, 11 insertions(+), 26 deletions(-) diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java index 7e7b456..0485b47 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java @@ -18,11 +18,9 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes; -import com.google.common.collect.Lists; import com.google.common.util.concurrent.*; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.skywalking.oap.server.core.CoreModule; @@ -51,8 +49,6 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery private final ReusableWatch<Event> watch; - private final ReentrantLock portSetLock; - private volatile int port = -1; KubernetesCoordinator(ModuleDefineHolder manager, @@ -61,7 +57,6 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery this.watch = watch; this.uid = uidSupplier.get(); TelemetryRelatedContext.INSTANCE.setId(uid); - this.portSetLock = new ReentrantLock(); } public void start() { @@ -104,16 +99,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery switch (event.getType()) { case "ADDED": case "MODIFIED": - if (port == -1) { - portSetLock.lock(); - try { - cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid)))); - } finally { - portSetLock.unlock(); - } - } else { - cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid)))); - } + cache.put(event.getUid(), new RemoteInstance(new Address(event.getHost(), port, event.getUid().equals(this.uid)))); break; case "DELETED": cache.remove(event.getUid()); @@ -125,20 +111,19 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery } @Override public List<RemoteInstance> queryRemoteNodes() { - if (port == -1) { - // Use lock mechanism to avoid concurrency conflict with `generateRemoteNodes`. - portSetLock.lock(); - try { + final List<RemoteInstance> list = new ArrayList<>(); + cache.values().forEach(instance -> { + Address address = instance.getAddress(); + if (port == -1) { logger.debug("Query kubernetes remote, port hasn't init, try to init"); ConfigService service = manager.find(CoreModule.NAME).provider().getService(ConfigService.class); port = service.getGRPCPort(); logger.debug("Query kubernetes remote, port is set at {}", port); - cache.values().forEach(instance -> instance.getAddress().setPort(port)); - } finally { - portSetLock.unlock(); } - } - logger.debug("Query kubernetes remote nodes: {}", cache); - return Lists.newArrayList(cache.values()); + list.add(new RemoteInstance(new Address(address.getHost(), port, address.isSelf()))); + }); + + logger.debug("Query kubernetes remote nodes: {}", list); + return list; } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java index 4c4b8da..00c36cf 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java @@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.core.Const; @Getter public class Address implements Comparable<Address> { private final String host; - @Setter private volatile int port; + private final int port; @Setter private volatile boolean isSelf; public Address(String host, int port, boolean isSelf) {