This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch k8s-Coordinator-role in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
commit 8c8f39d0f5c5f347b628ff55ca368f6606acec0d Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Mon Mar 11 20:52:44 2019 +0800 Make k8s-Coordinator back in role mode. --- .../ClusterModuleKubernetesProvider.java | 2 +- .../plugin/kubernetes/KubernetesCoordinator.java | 26 ++++++++++++---- .../kubernetes/KubernetesCoordinatorTest.java | 8 ++--- .../oap/server/core/CoreModuleConfig.java | 2 +- .../oap/server/core/CoreModuleProvider.java | 3 +- .../oap/server/core/config/gRPCConfigService.java | 36 ++++++++++++++++++++++ .../oap/server/core/remote/client/Address.java | 4 +-- 7 files changed, 66 insertions(+), 15 deletions(-) diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java index f1d0d07..4545e84 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/ClusterModuleKubernetesProvider.java @@ -53,7 +53,7 @@ public class ClusterModuleKubernetesProvider extends ModuleProvider { } @Override public void prepare() throws ServiceNotProvidedException { - KubernetesCoordinator coordinator = new KubernetesCoordinator( + KubernetesCoordinator coordinator = new KubernetesCoordinator(getManager(), new NamespacedPodListWatch(config.getNamespace(), config.getLabelSelector(), config.getWatchTimeoutSeconds()), new UidEnvSupplier(config.getUidEnvName())); this.registerServiceImplementation(ClusterRegister.class, coordinator); diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java index 730f4b2..3d6dee5 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java @@ -24,14 +24,17 @@ import java.util.*; import java.util.concurrent.*; import java.util.function.Supplier; import javax.annotation.Nullable; +import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.cluster.*; +import org.apache.skywalking.oap.server.core.config.gRPCConfigService; import org.apache.skywalking.oap.server.core.remote.client.Address; +import org.apache.skywalking.oap.server.library.module.ModuleManager; import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext; import org.slf4j.*; /** - * Read collector pod info from api-server of kubernetes, then using all containerIp list to - * construct the list of {@link RemoteInstance}. + * Read collector pod info from api-server of kubernetes, then using all containerIp list to construct the list of + * {@link RemoteInstance}. * * @author gaohongtao */ @@ -39,24 +42,28 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class); + private final ModuleManager manager; + private final String uid; private final Map<String, RemoteInstance> cache = new ConcurrentHashMap<>(); private final ReusableWatch<Event> watch; - private int port; + private volatile int port = -1; - KubernetesCoordinator(final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) { + KubernetesCoordinator(ModuleManager manager, + final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) { + this.manager = manager; this.watch = watch; this.uid = uidSupplier.get(); TelemetryRelatedContext.INSTANCE.setId(uid); + submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build()))); } @Override public void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException { this.port = remoteInstance.getAddress().getPort(); - submitTask(MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setDaemon(true).setNameFormat("Kubernetes-ApiServer-%s").build()))); } private void submitTask(final ListeningExecutorService service) { @@ -102,6 +109,13 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery } @Override public List<RemoteInstance> queryRemoteNodes() { + if (port == -1) { + logger.debug("Query kubernetes remote, port hasn't init, try to init"); + gRPCConfigService service = manager.find(CoreModule.NAME).provider().getService(gRPCConfigService.class); + port = service.getPort(); + logger.debug("Query kubernetes remote, port is set at {}", port); + cache.values().forEach(instance -> instance.getAddress().setPort(port)); + } logger.debug("Query kubernetes remote nodes: {}", cache); return Lists.newArrayList(cache.values()); } diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java index 4fd2699..78eb756 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java @@ -33,7 +33,7 @@ public class KubernetesCoordinatorTest { @Test public void assertAdded() throws InterruptedException { PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2"); - coordinator = new KubernetesCoordinator(watch, () -> "1"); + coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true))); watch.await(); assertThat(coordinator.queryRemoteNodes().size(), is(2)); @@ -43,7 +43,7 @@ public class KubernetesCoordinatorTest { @Test public void assertModified() throws InterruptedException { PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3"); - coordinator = new KubernetesCoordinator(watch, () -> "1"); + coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true))); watch.await(); assertThat(coordinator.queryRemoteNodes().size(), is(2)); @@ -53,7 +53,7 @@ public class KubernetesCoordinatorTest { @Test public void assertDeleted() throws InterruptedException { PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2"); - coordinator = new KubernetesCoordinator(watch, () -> "1"); + coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true))); watch.await(); assertThat(coordinator.queryRemoteNodes().size(), is(1)); @@ -63,7 +63,7 @@ public class KubernetesCoordinatorTest { @Test public void assertError() throws InterruptedException { PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2"); - coordinator = new KubernetesCoordinator(watch, () -> "1"); + coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true))); watch.await(); assertThat(coordinator.queryRemoteNodes().size(), is(2)); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index 35e3b49..31b59cd 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -27,7 +27,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; */ @Getter public class CoreModuleConfig extends ModuleConfig { - @Setter private String role; + @Setter private String role = "Mixed"; @Setter private String nameSpace; @Setter private String restHost; @Setter private int restPort; diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 52063c0..222afea 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -106,6 +106,7 @@ public class CoreModuleProvider extends ModuleProvider { jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors()); jettyServer.initialize(); + this.registerServiceImplementation(gRPCConfigService.class, new gRPCConfigService(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort())); this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling())); this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer)); @@ -176,7 +177,7 @@ public class CoreModuleProvider extends ModuleProvider { throw new ModuleStartException(e.getMessage(), e); } - if (CoreModuleConfig.Role.Mixed.name().equals(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equals(moduleConfig.getRole())) { + if (CoreModuleConfig.Role.Mixed.name().equalsIgnoreCase(moduleConfig.getRole()) || CoreModuleConfig.Role.Aggregator.name().equalsIgnoreCase(moduleConfig.getRole())) { RemoteInstance gRPCServerInstance = new RemoteInstance(new Address(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort(), true)); this.getManager().find(ClusterModule.NAME).provider().getService(ClusterRegister.class).registerRemote(gRPCServerInstance); } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java new file mode 100644 index 0000000..d5a1d15 --- /dev/null +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/config/gRPCConfigService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.core.config; + +import lombok.Getter; +import org.apache.skywalking.oap.server.library.module.Service; + +/** + * @author wusheng + */ +@Getter +public class gRPCConfigService implements Service { + private String host; + private int port; + + public gRPCConfigService(String host, int port) { + this.host = host; + this.port = port; + } +} diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java index 4e5d0ce..4c4b8da 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/Address.java @@ -27,8 +27,8 @@ import org.apache.skywalking.oap.server.core.Const; @Getter public class Address implements Comparable<Address> { private final String host; - private final int port; - @Setter private boolean isSelf; + @Setter private volatile int port; + @Setter private volatile boolean isSelf; public Address(String host, int port, boolean isSelf) { this.host = host;