This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch k8s-Coordinator-role
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git

commit 8c8f39d0f5c5f347b628ff55ca368f6606acec0d
Author: Wu Sheng <wu.sh...@foxmail.com>
AuthorDate: Mon Mar 11 20:52:44 2019 +0800

    Make k8s-Coordinator back in role mode.
---
 .../ClusterModuleKubernetesProvider.java           |  2 +-
 .../plugin/kubernetes/KubernetesCoordinator.java   | 26 ++++++++++++----
 .../kubernetes/KubernetesCoordinatorTest.java      |  8 ++---
 .../oap/server/core/CoreModuleConfig.java          |  2 +-
 .../oap/server/core/CoreModuleProvider.java        |  3 +-
 .../oap/server/core/config/gRPCConfigService.java  | 36 ++++++++++++++++++++++
 .../oap/server/core/remote/client/Address.java     |  4 +--
 7 files changed, 66 insertions(+), 15 deletions(-)

diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
index f1d0d07..4545e84 100644
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java
@@ -53,7 +53,7 @@ public class ClusterModuleKubernetesProvider extends 
ModuleProvider {
     }
 
     @Override public void prepare() throws ServiceNotProvidedException {
-        KubernetesCoordinator coordinator = new KubernetesCoordinator(
+        KubernetesCoordinator coordinator = new 
KubernetesCoordinator(getManager(),
             new NamespacedPodListWatch(config.getNamespace(), 
config.getLabelSelector(), config.getWatchTimeoutSeconds()),
             new UidEnvSupplier(config.getUidEnvName()));
         this.registerServiceImplementation(ClusterRegister.class, coordinator);
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
index 730f4b2..3d6dee5 100644
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java
@@ -24,14 +24,17 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
+import org.apache.skywalking.oap.server.core.CoreModule;
 import org.apache.skywalking.oap.server.core.cluster.*;
+import org.apache.skywalking.oap.server.core.config.gRPCConfigService;
 import org.apache.skywalking.oap.server.core.remote.client.Address;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
 import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext;
 import org.slf4j.*;
 
 /**
- * Read collector pod info from api-server of kubernetes, then using all 
containerIp list to
- * construct the list of {@link RemoteInstance}.
+ * Read collector pod info from api-server of kubernetes, then using all 
containerIp list to construct the list of
+ * {@link RemoteInstance}.
  *
  * @author gaohongtao
  */
@@ -39,24 +42,28 @@ public class KubernetesCoordinator implements 
ClusterRegister, ClusterNodesQuery
 
     private static final Logger logger = 
LoggerFactory.getLogger(KubernetesCoordinator.class);
 
+    private final ModuleManager manager;
+
     private final String uid;
 
     private final Map<String, RemoteInstance> cache = new 
ConcurrentHashMap<>();
 
     private final ReusableWatch<Event> watch;
 
-    private int port;
+    private volatile int port = -1;
 
-    KubernetesCoordinator(final ReusableWatch<Event> watch, final 
Supplier<String> uidSupplier) {
+    KubernetesCoordinator(ModuleManager manager,
+        final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) {
+        this.manager = manager;
         this.watch = watch;
         this.uid = uidSupplier.get();
         TelemetryRelatedContext.INSTANCE.setId(uid);
+        
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new
 ThreadFactoryBuilder()
+            
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
     }
 
     @Override public void registerRemote(RemoteInstance remoteInstance) throws 
ServiceRegisterException {
         this.port = remoteInstance.getAddress().getPort();
-        
submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new
 ThreadFactoryBuilder()
-            
.setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build())));
     }
 
     private void submitTask(final ListeningExecutorService service) {
@@ -102,6 +109,13 @@ public class KubernetesCoordinator implements 
ClusterRegister, ClusterNodesQuery
     }
 
     @Override public List<RemoteInstance> queryRemoteNodes() {
+        if (port == -1) {
+            logger.debug("Query kubernetes remote, port hasn't init, try to 
init");
+            gRPCConfigService service = 
manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class);
+            port = service.getPort();
+            logger.debug("Query kubernetes remote, port is set at {}", port);
+            cache.values().forEach(instance -> 
instance.getAddress().setPort(port));
+        }
         logger.debug("Query kubernetes remote nodes: {}", cache);
         return Lists.newArrayList(cache.values());
     }
diff --git 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
index 4fd2699..78eb756 100644
--- 
a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
+++ 
b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java
@@ -33,7 +33,7 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertAdded() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", 
"ADDED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> 
"1");
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 
8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -43,7 +43,7 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertModified() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", 
"ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> 
"1");
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 
8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
@@ -53,7 +53,7 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertDeleted() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", 
"ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> 
"1");
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 
8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(1));
@@ -63,7 +63,7 @@ public class KubernetesCoordinatorTest {
     @Test
     public void assertError() throws InterruptedException {
         PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", 
"ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2");
-        coordinator = new KubernetesCoordinator(watch, () -> "1");
+        coordinator = new KubernetesCoordinator(getManager(), watch, () -> 
"1");
         coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 
8454, true)));
         watch.await();
         assertThat(coordinator.queryRemoteNodes().size(), is(2));
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 35e3b49..31b59cd 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -27,7 +27,7 @@ import 
org.apache.skywalking.oap.server.library.module.ModuleConfig;
  */
 @Getter
 public class CoreModuleConfig extends ModuleConfig {
-    @Setter private String role;
+    @Setter private String role = "Mixed";
     @Setter private String nameSpace;
     @Setter private String restHost;
     @Setter private int restPort;
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 52063c0..222afea 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -106,6 +106,7 @@ public class CoreModuleProvider extends ModuleProvider {
         jettyServer = new JettyServer(moduleConfig.getRestHost(), 
moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), 
moduleConfig.getJettySelectors());
         jettyServer.initialize();
 
+        this.registerServiceImplementation(gRPCConfigService.class, new 
gRPCConfigService(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort()));
         this.registerServiceImplementation(DownsamplingConfigService.class, 
new DownsamplingConfigService(moduleConfig.getDownsampling()));
 
         this.registerServiceImplementation(GRPCHandlerRegister.class, new 
GRPCHandlerRegisterImpl(grpcServer));
@@ -176,7 +177,7 @@ public class CoreModuleProvider extends ModuleProvider {
             throw new ModuleStartException(e.getMessage(), e);
         }
 
-        if (CoreModuleConfig.Role.Mixed.name().equals(moduleConfig.getRole()) 
|| CoreModuleConfig.Role.Aggregator.name().equals(moduleConfig.getRole())) {
+        if 
(CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || 
CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole()))
 {
             RemoteInstance gRPCServerInstance = new RemoteInstance(new 
Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true));
             
this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance);
         }
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java
new file mode 100644
index 0000000..d5a1d15
--- /dev/null
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.config;
+
+import lombok.Getter;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+/**
+ * @author wusheng
+ */
+@Getter
+public class gRPCConfigService implements Service {
+    private String host;
+    private int port;
+
+    public gRPCConfigService(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+}
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
index 4e5d0ce..4c4b8da 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java
@@ -27,8 +27,8 @@ import org.apache.skywalking.oap.server.core.Const;
 @Getter
 public class Address implements Comparable<Address> {
     private final String host;
-    private final int port;
-    @Setter private boolean isSelf;
+    @Setter private volatile int port;
+    @Setter private volatile boolean isSelf;
 
     public Address(String host, int port, boolean isSelf) {
         this.host = host;

Reply via email to