This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch k8s-Coordinator-role in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/k8s-Coordinator-role by this push: new e46134a Fix text cases. e46134a is described below commit e46134a12f328452a0b1678ea366dc80397ee04a Author: Wu Sheng <wu.sh...@foxmail.com> AuthorDate: Mon Mar 11 21:23:57 2019 +0800 Fix text cases. --- .../cluster-kubernetes-plugin/pom.xml | 6 +++ .../plugin/kubernetes/KubernetesCoordinator.java | 6 +-- .../kubernetes/KubernetesCoordinatorTest.java | 51 ++++++++++++++++++++++ 3 files changed, 60 insertions(+), 3 deletions(-) diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml index 2eb1592..af12c92 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/pom.xml @@ -38,5 +38,11 @@ <groupId>io.kubernetes</groupId> <artifactId>client-java</artifactId> </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>server-testing</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> \ No newline at end of file diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java index af11b06..7e7b456 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java @@ -29,7 +29,7 @@ import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.cluster.*; import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.remote.client.Address; -import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; import org.apache.skywalking.oap.server.telemetry.api.TelemetryRelatedContext; import org.slf4j.*; @@ -43,7 +43,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery private static final Logger logger = LoggerFactory.getLogger(KubernetesCoordinator.class); - private final ModuleManager manager; + private final ModuleDefineHolder manager; private final String uid; @@ -55,7 +55,7 @@ public class KubernetesCoordinator implements ClusterRegister, ClusterNodesQuery private volatile int port = -1; - KubernetesCoordinator(ModuleManager manager, + KubernetesCoordinator(ModuleDefineHolder manager, final ReusableWatch<Event> watch, final Supplier<String> uidSupplier) { this.manager = manager; this.watch = watch; diff --git a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java index 78eb756..982f4eb 100644 --- a/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java +++ b/oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/test/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinatorTest.java @@ -19,12 +19,18 @@ package org.apache.skywalking.oap.server.cluster.plugin.kubernetes; import org.apache.skywalking.oap.server.cluster.plugin.kubernetes.fixture.PlainWatch; +import org.apache.skywalking.oap.server.core.*; import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; +import org.apache.skywalking.oap.server.core.config.ConfigService; import org.apache.skywalking.oap.server.core.remote.client.Address; +import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder; +import org.apache.skywalking.oap.server.testing.module.*; import org.junit.Test; +import org.mockito.Mockito; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; public class KubernetesCoordinatorTest { @@ -34,6 +40,7 @@ public class KubernetesCoordinatorTest { public void assertAdded() throws InterruptedException { PlainWatch watch = PlainWatch.create(2, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2"); coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); + coordinator.start(); coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true))); watch.await(); assertThat(coordinator.queryRemoteNodes().size(), is(2)); @@ -44,6 +51,7 @@ public class KubernetesCoordinatorTest { public void assertModified() throws InterruptedException { PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3"); coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); + coordinator.start(); coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true))); watch.await(); assertThat(coordinator.queryRemoteNodes().size(), is(2)); @@ -54,6 +62,7 @@ public class KubernetesCoordinatorTest { public void assertDeleted() throws InterruptedException { PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2"); coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); + coordinator.start(); coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true))); watch.await(); assertThat(coordinator.queryRemoteNodes().size(), is(1)); @@ -64,9 +73,51 @@ public class KubernetesCoordinatorTest { public void assertError() throws InterruptedException { PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2"); coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); + coordinator.start(); coordinator.registerRemote(new RemoteInstance(new Address("0.0.0.0", 8454, true))); watch.await(); assertThat(coordinator.queryRemoteNodes().size(), is(2)); assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1")); } + + @Test + public void assertModifiedInReceiverRole() throws InterruptedException { + PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "MODIFIED", "1", "10.0.0.3"); + coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); + coordinator.start(); + watch.await(); + assertThat(coordinator.queryRemoteNodes().size(), is(2)); + assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.3")); + } + + @Test + public void assertDeletedInReceiverRole() throws InterruptedException { + PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ADDED", "2", "10.0.0.2", "DELETED", "2", "10.0.0.2"); + coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); + coordinator.start(); + watch.await(); + assertThat(coordinator.queryRemoteNodes().size(), is(1)); + assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1")); + } + + @Test + public void assertErrorInReceiverRole() throws InterruptedException { + PlainWatch watch = PlainWatch.create(3, "ADDED", "1", "10.0.0.1", "ERROR", "X", "10.0.0.2", "ADDED", "2", "10.0.0.2"); + coordinator = new KubernetesCoordinator(getManager(), watch, () -> "1"); + coordinator.start(); + watch.await(); + assertThat(coordinator.queryRemoteNodes().size(), is(2)); + assertThat(coordinator.queryRemoteNodes().stream().filter(instance -> instance.getAddress().isSelf()).findFirst().get().getAddress().getHost(), is("10.0.0.1")); + } + + public ModuleDefineHolder getManager() { + ModuleManagerTesting moduleManagerTesting = new ModuleManagerTesting(); + ModuleDefineTesting coreModuleDefine = new ModuleDefineTesting(); + moduleManagerTesting.put(CoreModule.NAME, coreModuleDefine); + CoreModuleConfig config = Mockito.mock(CoreModuleConfig.class); + when(config.getGRPCHost()).thenReturn("127.0.0.1"); + when(config.getGRPCPort()).thenReturn(8454); + coreModuleDefine.provider().registerServiceImplementation(ConfigService.class, new ConfigService(config)); + return moduleManagerTesting; + } } \ No newline at end of file