This is an automated email from the ASF dual-hosted git repository. klund pushed a commit to branch support/1.13 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 8ae401e4c526a36b022bd0de708a9f2437cdde54 Author: Kirk Lund <kl...@apache.org> AuthorDate: Fri Jan 8 13:25:28 2021 -0800 GEODE-8696: Fix synchronization in FederatingManager (#5728) Prevent hang while protecting against removal of member artifacts during startup of manager. (cherry picked from commit 42726bd99c9ebb57ec5d3e987d47ecae9f4ea3a7) --- .../geode/management/CacheManagementDUnitTest.java | 2 +- .../DistributedSystemMXBeanDistributedTest.java | 26 +- .../JmxLocatorReconnectDistributedTest.java | 299 ++++++++++++++++++ ...java => JmxServerReconnectDistributedTest.java} | 294 +++++++++--------- ...BeanFederationErrorHandlingDistributedTest.java | 2 +- ...ederatingManagerConcurrencyIntegrationTest.java | 2 +- .../internal/FederatingManagerIntegrationTest.java | 92 ------ .../geode/logging/internal/LoggingSession.java | 4 +- .../management/internal/FederatingManager.java | 321 +++++++++++-------- .../geode/management/internal/LocalManager.java | 29 +- .../apache/geode/management/internal/Manager.java | 29 +- .../management/internal/ManagerLifecycle.java | 36 +++ .../management/internal/ManagerMembership.java | 43 +++ .../internal/beans/DistributedSystemBridge.java | 3 + .../management/internal/FederatingManagerTest.java | 343 ++++++++++++++++----- 15 files changed, 1030 insertions(+), 495 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java index 4736161..7a1d52a 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/CacheManagementDUnitTest.java @@ -387,7 +387,7 @@ public class CacheManagementDUnitTest implements Serializable { for (DistributedMember member : otherMembers) { Set<ObjectName> proxyNames = - service.getFederatingManager().getProxyFactory().findAllProxies(member); + service.getFederatingManager().proxyFactory().findAllProxies(member); assertThat(proxyNames).isEmpty(); ObjectName proxyMBeanName = service.getMemberMBeanName(member); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java index 555c981..b25fc7a 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/DistributedSystemMXBeanDistributedTest.java @@ -114,15 +114,18 @@ public class DistributedSystemMXBeanDistributedTest implements Serializable { public void getMemberCount() { // 1 manager, 3 members, 1 dunit locator managerVM.invoke(() -> { - await() - .untilAsserted(() -> assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5)); + await().untilAsserted(() -> { + assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5); + }); }); } @Test public void showJVMMetrics() { managerVM.invoke(() -> { - await().until(() -> distributedSystemMXBean.getMemberCount() == 5); + await().untilAsserted(() -> { + assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5); + }); for (DistributedMember member : getOtherNormalMembers()) { assertThat(distributedSystemMXBean.showJVMMetrics(member.getName())).isNotNull(); @@ -133,7 +136,9 @@ public class DistributedSystemMXBeanDistributedTest implements Serializable { @Test public void showOSMetrics() { managerVM.invoke(() -> { - await().until(() -> distributedSystemMXBean.getMemberCount() == 5); + await().untilAsserted(() -> { + assertThat(distributedSystemMXBean.getMemberCount()).isEqualTo(5); + }); Set<InternalDistributedMember> otherMembers = getOtherNormalMembers(); for (DistributedMember member : otherMembers) { @@ -147,15 +152,18 @@ public class DistributedSystemMXBeanDistributedTest implements Serializable { managerVM.invoke(() -> { distributedSystemMXBean.shutDownAllMembers(); - await().untilAsserted(() -> assertThat(getOtherNormalMembers()).hasSize(0)); + await().untilAsserted(() -> { + assertThat(getOtherNormalMembers()).hasSize(0); + }); }); } @Test public void listMemberObjectNames() { managerVM.invoke(() -> { - await().untilAsserted( - () -> assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(4)); + await().untilAsserted(() -> { + assertThat(distributedSystemMXBean.listMemberObjectNames()).hasSize(4); + }); }); } @@ -164,7 +172,9 @@ public class DistributedSystemMXBeanDistributedTest implements Serializable { managerVM.invoke(() -> { String memberName = distributedMember.getName(); - await().until(() -> distributedSystemMXBean.fetchMemberObjectName(memberName) != null); + await().untilAsserted(() -> { + assertThat(distributedSystemMXBean.fetchMemberObjectName(memberName)).isNotNull(); + }); ObjectName memberMXBeanName = distributedSystemMXBean.fetchMemberObjectName(memberName); assertThat(memberMXBeanName).isEqualTo(getMemberMBeanName(memberName)); diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java new file mode 100644 index 0000000..9d5678e --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxLocatorReconnectDistributedTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management; + +import static java.lang.management.ManagementFactory.getPlatformMBeanServer; +import static java.util.Arrays.asList; +import static javax.management.ObjectName.getInstance; +import static org.apache.geode.cache.Region.SEPARATOR; +import static org.apache.geode.distributed.ConfigurationProperties.HTTP_SERVICE_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_FILE; +import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT; +import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT; +import static org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper.crashDistributedSystem; +import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; +import static org.apache.geode.test.dunit.VM.getVM; +import static org.apache.geode.test.dunit.VM.getVMId; +import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.Serializable; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Callable; + +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.QueryExp; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.CancelException; +import org.apache.geode.ForcedDisconnectException; +import org.apache.geode.alerting.internal.spi.AlertingIOException; +import org.apache.geode.cache.CacheClosedException; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; +import org.apache.geode.distributed.LocatorLauncher; +import org.apache.geode.distributed.ServerLauncher; +import org.apache.geode.distributed.internal.InternalLocator; +import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedException; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.DistributedErrorCollector; +import org.apache.geode.test.dunit.rules.DistributedReference; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; +import org.apache.geode.test.dunit.rules.DistributedRule; +import org.apache.geode.test.junit.categories.JMXTest; +import org.apache.geode.test.junit.rules.GfshCommandRule; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; + +@Category(JMXTest.class) +@SuppressWarnings({"serial", "CodeBlock2Expr", "SameParameterValue"}) +public class JmxLocatorReconnectDistributedTest implements Serializable { + + private static final String LOCATOR_NAME = "locator"; + private static final String SERVER_NAME = "server"; + private static final String REGION_NAME = "region"; + private static final QueryExp QUERY_ALL = null; + + private static final ObjectName GEMFIRE_MXBEANS = + execute(() -> getInstance("GemFire:*")); + private static final Set<ObjectName> EXPECTED_SERVER_MXBEANS = + execute(() -> expectedServerMXBeans(SERVER_NAME, REGION_NAME)); + private static final Set<ObjectName> EXPECTED_LOCATOR_MXBEANS = + execute(() -> expectedLocatorMXBeans(LOCATOR_NAME)); + private static final Set<ObjectName> EXPECTED_DISTRIBUTED_MXBEANS = + execute(() -> expectedDistributedMXBeans(REGION_NAME)); + + private VM locatorVM; + private VM serverVM; + + private String locators; + private int locatorPort; + private int locatorJmxPort; + private Set<ObjectName> mxbeansOnServer; + private Set<ObjectName> mxbeansOnLocator; + + @Rule + public DistributedRule distributedRule = new DistributedRule(2); + @Rule + public DistributedErrorCollector errorCollector = new DistributedErrorCollector(); + @Rule + public DistributedReference<LocatorLauncher> locatorLauncher = new DistributedReference<>(); + @Rule + public DistributedReference<ServerLauncher> serverLauncher = new DistributedReference<>(); + @Rule + public DistributedRestoreSystemProperties restoreProps = new DistributedRestoreSystemProperties(); + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + @Rule + public transient GfshCommandRule gfsh = new GfshCommandRule(); + + @Before + public void setUp() throws Exception { + locatorVM = getVM(0); + serverVM = getVM(1); + + File locatorDir = temporaryFolder.newFolder(LOCATOR_NAME); + File serverDir = temporaryFolder.newFolder(SERVER_NAME); + + for (VM vm : asList(locatorVM, serverVM)) { + vm.invoke(() -> System.setProperty(GEMFIRE_PREFIX + "standard-output-always-on", "true")); + } + + int[] port = getRandomAvailableTCPPorts(2); + locatorPort = port[0]; + locatorJmxPort = port[1]; + locators = "localhost[" + locatorPort + "]"; + + locatorVM.invoke(() -> { + locatorLauncher.set(startLocator(locatorDir, locatorPort, locatorJmxPort, locators)); + }); + + serverVM.invoke(() -> serverLauncher.set(startServer(serverDir, locators))); + + gfsh.connectAndVerify(locatorJmxPort, GfshCommandRule.PortType.jmxManager); + + String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + REGION_NAME; + gfsh.executeAndAssertThat(createRegionCommand).statusIsSuccess(); + + addIgnoredException(AlertingIOException.class); + addIgnoredException(CacheClosedException.class); + addIgnoredException(CancelException.class); + addIgnoredException(DistributedSystemDisconnectedException.class); + addIgnoredException(ForcedDisconnectException.class); + addIgnoredException(MemberDisconnectedException.class); + addIgnoredException("Possible loss of quorum"); + + mxbeansOnServer = serverVM.invoke(() -> { + await().untilAsserted(() -> { + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsAll(EXPECTED_SERVER_MXBEANS); + }); + return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL); + }); + + mxbeansOnLocator = locatorVM.invoke(() -> { + await().untilAsserted(() -> { + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator") + .containsAll(EXPECTED_SERVER_MXBEANS) + .containsAll(EXPECTED_LOCATOR_MXBEANS) + .containsAll(EXPECTED_DISTRIBUTED_MXBEANS); + }); + return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL); + }); + } + + @Test + public void serverMXBeanOnServerIsUnaffectedByLocatorCrash() { + locatorVM.invoke(() -> { + crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem()); + + await().untilAsserted(() -> { + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator") + .isEmpty(); + }); + }); + + serverVM.invoke(() -> { + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsExactlyInAnyOrderElementsOf(mxbeansOnServer); + }); + + locatorVM.invoke(() -> { + InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator(); + + await().untilAsserted(() -> { + boolean isReconnected = locator.isReconnected(); + boolean isSharedConfigurationRunning = locator.isSharedConfigurationRunning(); + Set<ObjectName> mbeanNames = + getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL); + + assertThat(isReconnected) + .as("Locator is reconnected on locator") + .isTrue(); + assertThat(isSharedConfigurationRunning) + .as("Locator shared configuration is running on locator") + .isTrue(); + assertThat(mbeanNames) + .as("GemFire mxbeans on locator") + .isEqualTo(mxbeansOnLocator); + }); + }); + + serverVM.invoke(() -> { + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsExactlyInAnyOrderElementsOf(mxbeansOnServer); + }); + } + + private static LocatorLauncher startLocator(File workingDirectory, int locatorPort, int jmxPort, + String locators) { + LocatorLauncher locatorLauncher = new LocatorLauncher.Builder() + .setMemberName(LOCATOR_NAME) + .setPort(locatorPort) + .setWorkingDirectory(workingDirectory.getAbsolutePath()) + .set(HTTP_SERVICE_PORT, "0") + .set(JMX_MANAGER, "true") + .set(JMX_MANAGER_PORT, String.valueOf(jmxPort)) + .set(JMX_MANAGER_START, "true") + .set(LOCATORS, locators) + .set(LOG_FILE, new File(workingDirectory, LOCATOR_NAME + ".log").getAbsolutePath()) + .set(MAX_WAIT_TIME_RECONNECT, "1000") + .set(MEMBER_TIMEOUT, "2000") + .build(); + + locatorLauncher.start(); + + InternalLocator locator = (InternalLocator) locatorLauncher.getLocator(); + + await().untilAsserted(() -> { + assertThat(locator.isSharedConfigurationRunning()) + .as("Locator shared configuration is running on locator" + getVMId()) + .isTrue(); + }); + + return locatorLauncher; + } + + private static ServerLauncher startServer(File workingDirectory, String locators) { + ServerLauncher serverLauncher = new ServerLauncher.Builder() + .setDisableDefaultServer(true) + .setMemberName(SERVER_NAME) + .setWorkingDirectory(workingDirectory.getAbsolutePath()) + .set(HTTP_SERVICE_PORT, "0") + .set(LOCATORS, locators) + .set(LOG_FILE, new File(workingDirectory, SERVER_NAME + ".log").getAbsolutePath()) + .set(MAX_WAIT_TIME_RECONNECT, "1000") + .set(MEMBER_TIMEOUT, "2000") + .build(); + + serverLauncher.start(); + + return serverLauncher; + } + + private static Set<ObjectName> expectedServerMXBeans(String memberName, String regionName) + throws MalformedObjectNameException { + return new HashSet<>(asList( + getInstance("GemFire:type=Member,member=" + memberName), + getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName + + ",type=Member,member=" + memberName))); + } + + private static Set<ObjectName> expectedLocatorMXBeans(String memberName) + throws MalformedObjectNameException { + return new HashSet<>(asList( + getInstance("GemFire:service=DiskStore,name=cluster_config,type=Member,member=" + + memberName), + getInstance("GemFire:service=Locator,type=Member,member=" + memberName), + getInstance("GemFire:service=LockService,name=__CLUSTER_CONFIG_LS,type=Member,member=" + + memberName), + getInstance("GemFire:type=Member,member=" + memberName), + getInstance("GemFire:service=Manager,type=Member,member=" + memberName))); + } + + private static Set<ObjectName> expectedDistributedMXBeans(String regionName) + throws MalformedObjectNameException { + return new HashSet<>(asList( + getInstance("GemFire:service=AccessControl,type=Distributed"), + getInstance("GemFire:service=FileUploader,type=Distributed"), + getInstance("GemFire:service=LockService,name=__CLUSTER_CONFIG_LS,type=Distributed"), + getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName + ",type=Distributed"), + getInstance("GemFire:service=System,type=Distributed"))); + } + + private static <V> V execute(Callable<V> task) { + try { + return task.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java similarity index 63% rename from geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java rename to geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java index 781ffc1..ab1b46c 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/JMXMBeanReconnectDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/JmxServerReconnectDistributedTest.java @@ -32,14 +32,12 @@ import static org.apache.geode.distributed.internal.membership.api.MembershipMan import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPorts; import static org.apache.geode.test.awaitility.GeodeAwaitility.await; import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout; -import static org.apache.geode.test.dunit.Disconnect.disconnectAllFromDS; import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException; import static org.apache.geode.test.dunit.Invoke.invokeInEveryVM; import static org.apache.geode.test.dunit.VM.getVM; import static org.apache.geode.test.dunit.VM.getVMId; import static org.apache.geode.test.dunit.VM.toArray; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; import java.io.File; import java.io.Serializable; @@ -47,11 +45,13 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import javax.management.QueryExp; import org.junit.After; import org.junit.Before; @@ -73,6 +73,7 @@ import org.apache.geode.distributed.internal.membership.api.MemberDisconnectedEx import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.DistributedErrorCollector; +import org.apache.geode.test.dunit.rules.DistributedReference; import org.apache.geode.test.dunit.rules.DistributedRule; import org.apache.geode.test.junit.categories.JMXTest; import org.apache.geode.test.junit.rules.GfshCommandRule; @@ -80,61 +81,66 @@ import org.apache.geode.test.junit.rules.GfshCommandRule.PortType; import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; @Category(JMXTest.class) -@SuppressWarnings("serial") -public class JMXMBeanReconnectDUnitTest implements Serializable { - - private static final long TIMEOUT_MILLIS = getTimeout().toMillis(); - private static final LocatorLauncher DUMMY_LOCATOR = mock(LocatorLauncher.class); - private static final ServerLauncher DUMMY_SERVER = mock(ServerLauncher.class); +@SuppressWarnings({"serial", "CodeBlock2Expr", "SameParameterValue"}) +public class JmxServerReconnectDistributedTest implements Serializable { + + private static final String LOCATOR_1_NAME = "locator1"; + private static final String LOCATOR_2_NAME = "locator2"; + private static final String SERVER_NAME = "server"; + private static final String REGION_NAME = "region"; + private static final QueryExp QUERY_ALL = null; + + private static final ObjectName GEMFIRE_MXBEANS = + execute(() -> getInstance("GemFire:*")); + private static final Set<ObjectName> EXPECTED_SERVER_MXBEANS = + execute(() -> expectedServerMXBeans(SERVER_NAME, REGION_NAME)); + private static final Set<ObjectName> EXPECTED_LOCATOR_1_MXBEANS = + execute(() -> expectedLocatorMXBeans(LOCATOR_1_NAME)); + private static final Set<ObjectName> EXPECTED_LOCATOR_2_MXBEANS = + execute(() -> expectedLocatorMXBeans(LOCATOR_2_NAME)); + private static final Set<ObjectName> EXPECTED_DISTRIBUTED_MXBEANS = + execute(() -> expectedDistributedMXBeans(REGION_NAME)); private static final AtomicReference<CountDownLatch> BEFORE = new AtomicReference<>(new CountDownLatch(0)); private static final AtomicReference<CountDownLatch> AFTER = new AtomicReference<>(new CountDownLatch(0)); - private static final AtomicReference<LocatorLauncher> LOCATOR = - new AtomicReference<>(DUMMY_LOCATOR); - private static final AtomicReference<ServerLauncher> SERVER = - new AtomicReference<>(DUMMY_SERVER); - private VM locator1VM; private VM locator2VM; private VM serverVM; - private String locator1Name; - private String locator2Name; - private String serverName; private String locators; private int locator1Port; private int locator2Port; private int locator1JmxPort; private int locator2JmxPort; - private String regionName; private Set<ObjectName> mxbeansOnServer; private Set<ObjectName> mxbeansOnLocator1; private Set<ObjectName> mxbeansOnLocator2; @Rule - public DistributedRule distributedRule = new DistributedRule(); - @Rule - public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + public DistributedRule distributedRule = new DistributedRule(3); @Rule public DistributedErrorCollector errorCollector = new DistributedErrorCollector(); @Rule + public DistributedReference<LocatorLauncher> locatorLauncher = new DistributedReference<>(); + @Rule + public DistributedReference<ServerLauncher> serverLauncher = new DistributedReference<>(); + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + @Rule public transient GfshCommandRule gfsh = new GfshCommandRule(); @Before public void setUp() throws Exception { - locator1VM = getVM(1); - locator2VM = getVM(-1); - serverVM = getVM(0); + locator1VM = getVM(0); + locator2VM = getVM(1); + serverVM = getVM(2); - locator1Name = "locator1"; - locator2Name = "locator2"; - serverName = "server1"; - File locator1Dir = temporaryFolder.newFolder(locator1Name); - File locator2Dir = temporaryFolder.newFolder(locator2Name); - File serverDir = temporaryFolder.newFolder(serverName); + File locator1Dir = temporaryFolder.newFolder(LOCATOR_1_NAME); + File locator2Dir = temporaryFolder.newFolder(LOCATOR_2_NAME); + File serverDir = temporaryFolder.newFolder(SERVER_NAME); int[] port = getRandomAvailableTCPPorts(4); locator1Port = port[0]; @@ -144,18 +150,19 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { locators = "localhost[" + locator1Port + "],localhost[" + locator2Port + "]"; locator1VM.invoke(() -> { - startLocator(locator1Name, locator1Dir, locator1Port, locator1JmxPort, locators); + locatorLauncher.set( + startLocator(LOCATOR_1_NAME, locator1Dir, locator1Port, locator1JmxPort, locators)); }); locator2VM.invoke(() -> { - startLocator(locator2Name, locator2Dir, locator2Port, locator2JmxPort, locators); + locatorLauncher.set( + startLocator(LOCATOR_2_NAME, locator2Dir, locator2Port, locator2JmxPort, locators)); }); - serverVM.invoke(() -> startServer(serverName, serverDir, locators)); + serverVM.invoke(() -> serverLauncher.set(startServer(serverDir, locators))); gfsh.connectAndVerify(locator1JmxPort, PortType.jmxManager); - regionName = "region1"; - String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + regionName; + String createRegionCommand = "create region --type=REPLICATE --name=" + SEPARATOR + REGION_NAME; gfsh.executeAndAssertThat(createRegionCommand).statusIsSuccess(); addIgnoredException(AlertingIOException.class); @@ -168,37 +175,35 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { mxbeansOnServer = serverVM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") - .containsAll(expectedServerMXBeans(serverName, regionName)); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsAll(EXPECTED_SERVER_MXBEANS); }); - return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null); + return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL); }); mxbeansOnLocator1 = locator1VM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator1") - .containsAll(expectedServerMXBeans(serverName, regionName)) - .containsAll(expectedLocatorMXBeans(locator1Name)) - .containsAll(expectedLocatorMXBeans(locator2Name)) - .containsAll(expectedDistributedMXBeans(regionName)); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator1") + .containsAll(EXPECTED_SERVER_MXBEANS) + .containsAll(EXPECTED_LOCATOR_1_MXBEANS) + .containsAll(EXPECTED_LOCATOR_2_MXBEANS) + .containsAll(EXPECTED_DISTRIBUTED_MXBEANS); }); - - return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null); + return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL); }); mxbeansOnLocator2 = locator2VM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator2") - .containsAll(expectedServerMXBeans(serverName, regionName)) - .containsAll(expectedLocatorMXBeans(locator2Name)) - .containsAll(expectedLocatorMXBeans(locator1Name)) - .containsAll(expectedDistributedMXBeans(regionName)); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator2") + .containsAll(EXPECTED_SERVER_MXBEANS) + .containsAll(EXPECTED_LOCATOR_2_MXBEANS) + .containsAll(EXPECTED_LOCATOR_1_MXBEANS) + .containsAll(EXPECTED_DISTRIBUTED_MXBEANS); }); - - return getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null); + return getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL); }); } @@ -207,19 +212,16 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { invokeInEveryVM(() -> { BEFORE.get().countDown(); AFTER.get().countDown(); - SERVER.getAndSet(DUMMY_SERVER).stop(); - LOCATOR.getAndSet(DUMMY_LOCATOR).stop(); }); - disconnectAllFromDS(); } @Test public void serverHasMemberTypeMXBeans() { serverVM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") - .containsAll(expectedServerMXBeans(serverName, regionName)); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsAll(EXPECTED_SERVER_MXBEANS); }); }); } @@ -229,9 +231,9 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { for (VM locatorVM : toArray(locator1VM, locator2VM)) { locatorVM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") - .containsAll(expectedServerMXBeans(serverName, regionName)); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsAll(EXPECTED_SERVER_MXBEANS); }); }); } @@ -241,17 +243,17 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { public void locatorHasMemberTypeMXBeansForBothLocators() { locator1VM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator1") - .containsAll(expectedLocatorMXBeans(locator1Name)); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator1") + .containsAll(EXPECTED_LOCATOR_1_MXBEANS); }); }); locator2VM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator2") - .containsAll(expectedLocatorMXBeans(locator2Name)); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator2") + .containsAll(EXPECTED_LOCATOR_2_MXBEANS); }); }); } @@ -261,9 +263,9 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { for (VM locatorVM : toArray(locator1VM, locator2VM)) { locatorVM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator" + getVMId()) - .containsAll(expectedDistributedMXBeans(regionName)); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator" + getVMId()) + .containsAll(EXPECTED_DISTRIBUTED_MXBEANS); }); }); } @@ -275,29 +277,30 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { @Test public void serverMXBeansOnServerAreUnaffectedByLocatorCrash() { locator1VM.invoke(() -> { - crashDistributedSystem(LOCATOR.get().getCache().getDistributedSystem()); + crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem()); await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator1") .isEmpty(); }); }); serverVM.invoke(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") - .containsExactlyElementsOf(mxbeansOnServer); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsExactlyInAnyOrderElementsOf(mxbeansOnServer); }); locator1VM.invoke(() -> { - InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator(); + InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator(); await().untilAsserted(() -> { boolean isReconnected = locator.isReconnected(); boolean isSharedConfigurationRunning = locator.isSharedConfigurationRunning(); Set<ObjectName> mbeanNames = - getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null); + getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL); + assertThat(isReconnected) .as("Locator is reconnected on locator1") .isTrue(); @@ -305,15 +308,15 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { .as("Locator shared configuration is running on locator1") .isTrue(); assertThat(mbeanNames) - .as("GemFire mbeans on locator1") + .as("GemFire mxbeans on locator1") .isEqualTo(mxbeansOnLocator1); }); }); serverVM.invoke(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") - .containsExactlyElementsOf(mxbeansOnServer); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsExactlyInAnyOrderElementsOf(mxbeansOnServer); }); } @@ -323,48 +326,48 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { @Test public void serverMXBeansOnLocatorAreRestoredAfterCrashedServerReturns() { serverVM.invoke(() -> { - crashDistributedSystem(SERVER.get().getCache().getDistributedSystem()); + crashDistributedSystem(serverLauncher.get().getCache().getDistributedSystem()); await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") .isEmpty(); }); }); locator1VM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator1") .doesNotContainAnyElementsOf(mxbeansOnServer); }); }); serverVM.invoke(() -> { - InternalCache cache = (InternalCache) SERVER.get().getCache(); + InternalCache cache = (InternalCache) serverLauncher.get().getCache(); InternalDistributedSystem system = cache.getInternalDistributedSystem(); await().untilAsserted(() -> { assertThat(system.isReconnecting()) - .as("System is reconnecting on server1") + .as("System is reconnecting on server") .isTrue(); }); - system.waitUntilReconnected(TIMEOUT_MILLIS, MILLISECONDS); + system.waitUntilReconnected(getTimeout().toMillis(), MILLISECONDS); await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") - .containsExactlyElementsOf(mxbeansOnServer); + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") + .containsExactlyInAnyOrderElementsOf(mxbeansOnServer); }); }); locator1VM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator1") .containsAll(mxbeansOnServer) - .containsExactlyElementsOf(mxbeansOnLocator1); + .containsExactlyInAnyOrderElementsOf(mxbeansOnLocator1); }); }); } @@ -384,31 +387,31 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { @Override public void reconnecting(InternalDistributedSystem oldSystem) { try { - BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS); + BEFORE.get().await(getTimeout().toMillis(), MILLISECONDS); } catch (InterruptedException e) { errorCollector.addError(e); } } }); - crashDistributedSystem(LOCATOR.get().getCache().getDistributedSystem()); + crashDistributedSystem(locatorLauncher.get().getCache().getDistributedSystem()); await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator1") .isEmpty(); }); }); locator2VM.invoke(() -> { Collection<ObjectName> locator1MXBeans = new ArrayList<>(mxbeansOnLocator1); - locator1MXBeans.removeAll(expectedServerMXBeans(serverName, regionName)); - locator1MXBeans.removeAll(expectedLocatorMXBeans(locator2Name)); - locator1MXBeans.removeAll(expectedDistributedMXBeans(regionName)); + locator1MXBeans.removeAll(EXPECTED_SERVER_MXBEANS); + locator1MXBeans.removeAll(EXPECTED_LOCATOR_2_MXBEANS); + locator1MXBeans.removeAll(EXPECTED_DISTRIBUTED_MXBEANS); await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator2") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator2") .doesNotContainAnyElementsOf(locator1MXBeans); }); }); @@ -416,14 +419,14 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { locator1VM.invoke(() -> { BEFORE.get().countDown(); - await().untilAsserted(() -> { - InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator(); + InternalLocator locator = (InternalLocator) locatorLauncher.get().getLocator(); + await().untilAsserted(() -> { assertThat(locator.isSharedConfigurationRunning()) .as("Locator shared configuration is running on locator1") .isTrue(); - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator1") .containsAll(mxbeansOnLocator1); }); }); @@ -445,7 +448,7 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { @Override public void reconnecting(InternalDistributedSystem oldSystem) { try { - BEFORE.get().await(TIMEOUT_MILLIS, MILLISECONDS); + BEFORE.get().await(getTimeout().toMillis(), MILLISECONDS); } catch (InterruptedException e) { errorCollector.addError(e); } @@ -458,11 +461,11 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { } }); - crashDistributedSystem(SERVER.get().getCache().getDistributedSystem()); + crashDistributedSystem(serverLauncher.get().getCache().getDistributedSystem()); await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") .isEmpty(); }); }); @@ -470,8 +473,8 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { for (VM locatorVM : toArray(locator1VM, locator2VM)) { locatorVM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator" + locatorVM.getId()) + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator" + locatorVM.getId()) .isNotEmpty() .doesNotContainAnyElementsOf(mxbeansOnServer); }); @@ -480,35 +483,35 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { serverVM.invoke(() -> { BEFORE.get().countDown(); - AFTER.get().await(TIMEOUT_MILLIS, MILLISECONDS); + AFTER.get().await(getTimeout().toMillis(), MILLISECONDS); await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on server1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on server") .isEqualTo(mxbeansOnServer); }); }); locator1VM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator1") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator1") .containsAll(mxbeansOnLocator1); }); }); locator2VM.invoke(() -> { await().untilAsserted(() -> { - assertThat(getPlatformMBeanServer().queryNames(getInstance("GemFire:*"), null)) - .as("GemFire mbeans on locator2") + assertThat(getPlatformMBeanServer().queryNames(GEMFIRE_MXBEANS, QUERY_ALL)) + .as("GemFire mxbeans on locator2") .containsAll(mxbeansOnLocator2); }); }); } - private static void startLocator(String name, File workingDirectory, int locatorPort, int jmxPort, - String locators) { - LOCATOR.set(new LocatorLauncher.Builder() + private static LocatorLauncher startLocator(String name, File workingDirectory, int locatorPort, + int jmxPort, String locators) { + LocatorLauncher locatorLauncher = new LocatorLauncher.Builder() .setMemberName(name) .setPort(locatorPort) .setWorkingDirectory(workingDirectory.getAbsolutePath()) @@ -520,39 +523,44 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { .set(LOG_FILE, new File(workingDirectory, name + ".log").getAbsolutePath()) .set(MAX_WAIT_TIME_RECONNECT, "1000") .set(MEMBER_TIMEOUT, "2000") - .build()); + .build(); + + locatorLauncher.start(); - LOCATOR.get().start(); + InternalLocator locator = (InternalLocator) locatorLauncher.getLocator(); await().untilAsserted(() -> { - InternalLocator locator = (InternalLocator) LOCATOR.get().getLocator(); assertThat(locator.isSharedConfigurationRunning()) .as("Locator shared configuration is running on locator" + getVMId()) .isTrue(); }); + + return locatorLauncher; } - private static void startServer(String name, File workingDirectory, String locators) { - SERVER.set(new ServerLauncher.Builder() + private static ServerLauncher startServer(File workingDirectory, String locators) { + ServerLauncher serverLauncher = new ServerLauncher.Builder() .setDisableDefaultServer(true) - .setMemberName(name) + .setMemberName(SERVER_NAME) .setWorkingDirectory(workingDirectory.getAbsolutePath()) .set(HTTP_SERVICE_PORT, "0") .set(LOCATORS, locators) - .set(LOG_FILE, new File(workingDirectory, name + ".log").getAbsolutePath()) + .set(LOG_FILE, new File(workingDirectory, SERVER_NAME + ".log").getAbsolutePath()) .set(MAX_WAIT_TIME_RECONNECT, "1000") .set(MEMBER_TIMEOUT, "2000") - .build()); + .build(); + + serverLauncher.start(); - SERVER.get().start(); + return serverLauncher; } private static Set<ObjectName> expectedServerMXBeans(String memberName, String regionName) throws MalformedObjectNameException { return new HashSet<>(asList( getInstance("GemFire:type=Member,member=" + memberName), - getInstance("GemFire:service=Region,name=/" + regionName + ",type=Member,member=" + - memberName))); + getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName + + ",type=Member,member=" + memberName))); } private static Set<ObjectName> expectedLocatorMXBeans(String memberName) @@ -573,7 +581,15 @@ public class JMXMBeanReconnectDUnitTest implements Serializable { getInstance("GemFire:service=AccessControl,type=Distributed"), getInstance("GemFire:service=FileUploader,type=Distributed"), getInstance("GemFire:service=LockService,name=__CLUSTER_CONFIG_LS,type=Distributed"), - getInstance("GemFire:service=Region,name=/" + regionName + ",type=Distributed"), + getInstance("GemFire:service=Region,name=" + SEPARATOR + regionName + ",type=Distributed"), getInstance("GemFire:service=System,type=Distributed"))); } + + private static <V> V execute(Callable<V> task) { + try { + return task.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java index 36cc35d..60faef5 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/MBeanFederationErrorHandlingDistributedTest.java @@ -178,7 +178,7 @@ public class MBeanFederationErrorHandlingDistributedTest implements Serializable (SystemManagementService) ManagementService.getManagementService(cache); service.startManager(); FederatingManager federatingManager = service.getFederatingManager(); - proxyFactory = federatingManager.getProxyFactory(); + proxyFactory = federatingManager.proxyFactory(); return locatorLauncher.getPort(); } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java index 2efe96e..d9dc552 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerConcurrencyIntegrationTest.java @@ -82,7 +82,7 @@ public class FederatingManagerConcurrencyIntegrationTest { await().until(() -> !cache.getAllRegions().isEmpty()); - assertThat(federatingManager.getAndResetLatestException()).isNull(); + assertThat(federatingManager.latestException()).isNull(); } private InternalDistributedMember member() throws UnknownHostException { diff --git a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java deleted file mode 100644 index db6b2dd..0000000 --- a/geode-core/src/integrationTest/java/org/apache/geode/management/internal/FederatingManagerIntegrationTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.apache.geode.management.internal; - -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.mockito.quality.Strictness.STRICT_STUBS; - -import java.io.IOException; -import java.util.Collections; -import java.util.concurrent.Executors; - -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.Mock; -import org.mockito.junit.MockitoJUnit; -import org.mockito.junit.MockitoRule; - -import org.apache.geode.StatisticsFactory; -import org.apache.geode.distributed.internal.DistributionManager; -import org.apache.geode.distributed.internal.InternalDistributedSystem; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.internal.cache.InternalCacheForClientAccess; -import org.apache.geode.internal.statistics.StatisticsClock; -import org.apache.geode.test.junit.categories.JMXTest; - -@Category(JMXTest.class) -public class FederatingManagerIntegrationTest { - @Rule - public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS); - - @Mock - public InternalCache cache; - @Mock - public InternalCacheForClientAccess cacheForClientAccess; - @Mock - public MBeanProxyFactory proxyFactory; - @Mock - public MemberMessenger messenger; - @Mock - public ManagementResourceRepo repo; - @Mock - public SystemManagementService service; - @Mock - public StatisticsFactory statisticsFactory; - @Mock - public StatisticsClock statisticsClock; - @Mock - public InternalDistributedSystem system; - @Mock - public DistributionManager distributionManager; - - @Before - public void setUp() throws IOException, ClassNotFoundException { - when(cache.getCacheForProcessingClientRequests()) - .thenReturn(cacheForClientAccess); - when(system.getDistributionManager()) - .thenReturn(distributionManager); - } - - @Test - public void restartDoesNotThrowIfOtherMembersExist() { - when(distributionManager.getOtherDistributionManagerIds()) - .thenReturn(Collections.singleton(mock(InternalDistributedMember.class))); - - FederatingManager federatingManager = - new FederatingManager(repo, system, service, cache, statisticsFactory, - statisticsClock, proxyFactory, messenger, Executors::newSingleThreadExecutor); - - federatingManager.startManager(); - federatingManager.stopManager(); - - assertThatCode(federatingManager::startManager) - .doesNotThrowAnyException(); - } -} diff --git a/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java b/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java index 1af2974..cadbe26 100644 --- a/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java +++ b/geode-core/src/main/java/org/apache/geode/logging/internal/LoggingSession.java @@ -35,8 +35,8 @@ import org.apache.geode.logging.internal.spi.LogFile; */ public class LoggingSession implements InternalSessionContext { - static final boolean STANDARD_OUTPUT_ALWAYS_ON = - Boolean.valueOf(System.getProperty(GEMFIRE_PREFIX + "standard-output-always-on", "false")); + private static final boolean STANDARD_OUTPUT_ALWAYS_ON = + Boolean.getBoolean(GEMFIRE_PREFIX + "standard-output-always-on"); private static final Logger logger = LogService.getLogger(); diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java index 714c9cb..3ba7c21 100755 --- a/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/FederatingManager.java @@ -20,11 +20,13 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import javax.management.Notification; @@ -63,23 +65,26 @@ import org.apache.geode.management.ManagementException; * * @since GemFire 7.0 */ -public class FederatingManager extends Manager { +public class FederatingManager extends Manager implements ManagerMembership { private static final Logger logger = LogService.getLogger(); - private final SystemManagementService service; - private final AtomicReference<Exception> latestException = new AtomicReference<>(); - - private final Supplier<ExecutorService> executorServiceSupplier; - private final MBeanProxyFactory proxyFactory; - private final MemberMessenger messenger; - /** * This Executor uses a pool of thread to execute the member addition /removal tasks, This will * utilize the processing powers available. Going with unbounded queue because tasks wont be * unbounded in practical situation as number of members will be a finite set at any given point * of time */ - private ExecutorService executorService; + private final AtomicReference<ExecutorService> executorService = new AtomicReference<>(); + private final AtomicReference<Exception> latestException = new AtomicReference<>(); + private final List<Runnable> pendingTasks = new CopyOnWriteArrayList<>(); + + private final SystemManagementService service; + private final Supplier<ExecutorService> executorServiceSupplier; + private final MBeanProxyFactory proxyFactory; + private final MemberMessenger messenger; + private final ReentrantLock lifecycleLock; + + private volatile boolean starting; @VisibleForTesting FederatingManager(ManagementResourceRepo repo, InternalDistributedSystem system, @@ -99,6 +104,7 @@ public class FederatingManager extends Manager { this.proxyFactory = proxyFactory; this.messenger = messenger; this.executorServiceSupplier = executorServiceSupplier; + lifecycleLock = new ReentrantLock(); } /** @@ -108,31 +114,69 @@ public class FederatingManager extends Manager { @Override public synchronized void startManager() { try { - if (logger.isDebugEnabled()) { - logger.debug("Starting the Federating Manager.... "); + lifecycleLock.lock(); + try { + if (starting || running) { + return; + } + if (logger.isDebugEnabled()) { + logger.debug("Starting the Federating Manager.... "); + } + starting = true; + executorService.set(executorServiceSupplier.get()); + running = true; + } finally { + lifecycleLock.unlock(); } - executorService = executorServiceSupplier.get(); - - running = true; startManagingActivity(); + + lifecycleLock.lock(); + try { + for (Runnable task : pendingTasks) { + executeTask(task); + } + } finally { + pendingTasks.clear(); + starting = false; + lifecycleLock.unlock(); + } + messenger.broadcastManagerInfo(); + } catch (Exception e) { - running = false; + cleanupFailedStart(); throw new ManagementException(e); } } + private void cleanupFailedStart() { + lifecycleLock.lock(); + try { + pendingTasks.clear(); + running = false; + starting = false; + } finally { + lifecycleLock.unlock(); + } + } + @Override public synchronized void stopManager() { - // remove hidden management regions and federatedMBeans - if (!running) { - return; - } - running = false; - if (logger.isDebugEnabled()) { - logger.debug("Stopping the Federating Manager.... "); + lifecycleLock.lock(); + try { + // remove hidden management regions and federatedMBeans + if (!running) { + return; + } + if (logger.isDebugEnabled()) { + logger.debug("Stopping the Federating Manager.... "); + } + running = false; + } finally { + lifecycleLock.unlock(); } + stopManagingActivity(); } @@ -141,35 +185,56 @@ public class FederatingManager extends Manager { return running; } - public MemberMessenger getMessenger() { - return messenger; + /** + * This method will delegate task to another thread and exit, so that it wont block the membership + * listener + */ + @Override + public void addMember(InternalDistributedMember member) { + lifecycleLock.lock(); + try { + if (!running) { + return; + } + executeTask(() -> new AddMemberTask(member).call()); + } finally { + lifecycleLock.unlock(); + } } /** - * This method will be invoked from MembershipListener which is registered when the member becomes - * a Management node. - * - * <p> * This method will delegate task to another thread and exit, so that it wont block the membership * listener */ - void removeMember(DistributedMember member, boolean crashed) { - executeTask(new RemoveMemberTask(member, crashed)); + @Override + public void removeMember(DistributedMember member, boolean crashed) { + lifecycleLock.lock(); + try { + Runnable task = new RemoveMemberTask(member, crashed); + if (starting) { + pendingTasks.add(task); + } else if (running) { + executeTask(task); + } + } finally { + lifecycleLock.unlock(); + } } /** - * This method will be invoked from MembershipListener which is registered when the member becomes - * a Management node. - * - * <p> * this method will delegate task to another thread and exit, so that it wont block the membership * listener */ - void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected, + @Override + public void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected, String reason) { service.memberSuspect((InternalDistributedMember) member, whoSuspected, reason); } + public MemberMessenger getMessenger() { + return messenger; + } + /** * This will return the last updated time of the proxyMBean. * @@ -207,30 +272,6 @@ public class FederatingManager extends Manager { } /** - * This method will be invoked whenever a member stops being a managing node. The - * {@code ManagementException} has to be handled by the caller. - */ - private void stopManagingActivity() { - try { - executorService.shutdownNow(); - - for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) { - removeMemberArtifacts(distributedMember, false); - } - } catch (Exception e) { - throw new ManagementException(e); - } - } - - private synchronized void executeTask(Runnable task) { - try { - executorService.execute(task); - } catch (RejectedExecutionException ignored) { - // Ignore, we are getting shutdown - } - } - - /** * This method will be invoked when a node transitions from managed node to managing node This * method will block for all GIIs to be completed But each GII is given a specific time frame. * After that the task will be marked as cancelled. @@ -242,7 +283,7 @@ public class FederatingManager extends Manager { for (InternalDistributedMember member : system.getDistributionManager() .getOtherDistributionManagerIds()) { - giiTaskList.add(new GIITask(member)); + giiTaskList.add(new AddMemberTask(member)); } try { @@ -250,7 +291,7 @@ public class FederatingManager extends Manager { logger.debug("Management Resource creation started : "); } List<Future<InternalDistributedMember>> futureTaskList = - executorService.invokeAll(giiTaskList); + executorService.get().invokeAll(giiTaskList); for (Future<InternalDistributedMember> futureTask : futureTaskList) { try { @@ -296,76 +337,27 @@ public class FederatingManager extends Manager { } /** - * This method will be invoked from MembershipListener which is registered when the member becomes - * a Management node. - * - * <p> - * This method will delegate task to another thread and exit, so that it wont block the membership - * listener + * This method will be invoked whenever a member stops being a managing node. The + * {@code ManagementException} has to be handled by the caller. */ - @VisibleForTesting - void addMember(InternalDistributedMember member) { - GIITask giiTask = new GIITask(member); - executeTask(() -> { - try { - giiTask.call(); - } catch (RuntimeException e) { - logger.warn("Error federating new member {}", member.getId(), e); - latestException.set(e); - } - }); - } - - @VisibleForTesting - void removeMemberArtifacts(DistributedMember member, boolean crashed) { - Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member); - Region<NotificationKey, Notification> notificationRegion = - repo.getEntryFromNotifRegionMap(member); - - if (monitoringRegion == null && notificationRegion == null) { - return; - } - - repo.romoveEntryFromMonitoringRegionMap(member); - repo.removeEntryFromNotifRegionMap(member); - - // If cache is closed all the regions would have been destroyed implicitly - if (!cache.isClosed()) { - try { - if (monitoringRegion != null) { - proxyFactory.removeAllProxies(member, monitoringRegion); - monitoringRegion.localDestroyRegion(); - } - } catch (CancelException | RegionDestroyedException ignore) { - // ignored - } - - try { - if (notificationRegion != null) { - notificationRegion.localDestroyRegion(); - } - } catch (CancelException | RegionDestroyedException ignore) { - // ignored - } - } + private void stopManagingActivity() { + try { + executorService.get().shutdownNow(); - if (!system.getDistributedMember().equals(member)) { - try { - service.memberDeparted((InternalDistributedMember) member, crashed); - } catch (CancelException | RegionDestroyedException ignore) { - // ignored + for (DistributedMember distributedMember : repo.getMonitoringRegionMap().keySet()) { + removeMemberArtifacts(distributedMember, false); } + } catch (Exception e) { + throw new ManagementException(e); } } - @VisibleForTesting - public MBeanProxyFactory getProxyFactory() { - return proxyFactory; - } - - @VisibleForTesting - synchronized Exception getAndResetLatestException() { - return latestException.getAndSet(null); + private synchronized void executeTask(Runnable task) { + try { + executorService.get().execute(task); + } catch (RejectedExecutionException ignored) { + // Ignore, we are getting shutdown + } } @VisibleForTesting @@ -503,19 +495,80 @@ public class FederatingManager extends Manager { } } + @VisibleForTesting + void removeMemberArtifacts(DistributedMember member, boolean crashed) { + Region<String, Object> monitoringRegion = repo.getEntryFromMonitoringRegionMap(member); + Region<NotificationKey, Notification> notificationRegion = + repo.getEntryFromNotifRegionMap(member); + + if (monitoringRegion == null && notificationRegion == null) { + return; + } + + repo.romoveEntryFromMonitoringRegionMap(member); + repo.removeEntryFromNotifRegionMap(member); + + // If cache is closed all the regions would have been destroyed implicitly + if (!cache.isClosed()) { + try { + if (monitoringRegion != null) { + proxyFactory.removeAllProxies(member, monitoringRegion); + monitoringRegion.localDestroyRegion(); + } + } catch (CancelException | RegionDestroyedException ignore) { + // ignored + } + + try { + if (notificationRegion != null) { + notificationRegion.localDestroyRegion(); + } + } catch (CancelException | RegionDestroyedException ignore) { + // ignored + } + } + + if (!system.getDistributedMember().equals(member)) { + try { + service.memberDeparted((InternalDistributedMember) member, crashed); + } catch (CancelException | RegionDestroyedException ignore) { + // ignored + } + } + } + + @VisibleForTesting + public MBeanProxyFactory proxyFactory() { + return proxyFactory; + } + + @VisibleForTesting + Exception latestException() { + return latestException.getAndSet(null); + } + + @VisibleForTesting + List<Runnable> pendingTasks() { + return pendingTasks; + } + + @VisibleForTesting + boolean isStarting() { + return starting; + } + /** - * Actual task of doing the GII + * Actual task of adding a member. * * <p> - * It will perform the GII request which might originate from TransitionListener or Membership - * Listener. + * Perform the GII request which might originate from transition listener or membership listener. * * <p> - * Managing Node side resources are created per member which is visible to this node: + * Manager resources are created per member which is visible to this node: * * <pre> - * 1)Management Region : its a Replicated NO_ACK region - * 2)Notification Region : its a Replicated Proxy NO_ACK region + * 1) Management Region : its a Replicated NO_ACK region + * 2) Notification Region : its a Replicated Proxy NO_ACK region * </pre> * * <p> @@ -528,13 +581,13 @@ public class FederatingManager extends Manager { * * <p> * This task can be cancelled from the calling thread if a timeout happens. In that case we have - * to handle the thread interrupt + * to handle the thread interrupt. */ - private class GIITask implements Callable<InternalDistributedMember> { + private class AddMemberTask implements Callable<InternalDistributedMember> { private final InternalDistributedMember member; - private GIITask(InternalDistributedMember member) { + private AddMemberTask(InternalDistributedMember member) { this.member = member; } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java index b67d8f5..b28af5b 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/LocalManager.java @@ -14,12 +14,15 @@ */ package org.apache.geode.management.internal; +import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor; + import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.management.MalformedObjectNameException; import javax.management.Notification; @@ -43,7 +46,6 @@ import org.apache.geode.internal.cache.HasCachePerfStats; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.InternalRegionFactory; import org.apache.geode.internal.statistics.StatisticsClock; -import org.apache.geode.logging.internal.executors.LoggingExecutors; import org.apache.geode.logging.internal.log4j.api.LogService; import org.apache.geode.management.ManagementException; @@ -61,13 +63,14 @@ public class LocalManager extends Manager { /** * Management Task pushes data to the admin regions */ - private ManagementTask managementTask; + private final AtomicReference<ManagementTask> managementTask = new AtomicReference<>(); /** * This service will be responsible for executing ManagementTasks and periodically push data to * localMonitoringRegion */ - private ScheduledExecutorService singleThreadFederationScheduler; + private final AtomicReference<ScheduledExecutorService> singleThreadFederationScheduler = + new AtomicReference<>(); /** * This map holds all the components which are eligible for federation. Although filters might @@ -103,8 +106,7 @@ public class LocalManager extends Manager { if (repo.getLocalMonitoringRegion() != null) { return; } - singleThreadFederationScheduler = - LoggingExecutors.newSingleThreadScheduledExecutor("Management Task"); + singleThreadFederationScheduler.set(newSingleThreadScheduledExecutor("Management Task")); if (logger.isDebugEnabled()) { logger.debug("Creating Management Region :"); @@ -165,14 +167,14 @@ public class LocalManager extends Manager { } } - managementTask = new ManagementTask(); + managementTask.set(new ManagementTask()); // call run to get us initialized immediately with a sync call - managementTask.run(); + managementTask.get().run(); // All local resources are created for the ManagementTask // Now Management tasks can proceed. int updateRate = system.getConfig().getJmxManagerUpdateRate(); - singleThreadFederationScheduler.scheduleAtFixedRate(managementTask, updateRate, updateRate, - TimeUnit.MILLISECONDS); + singleThreadFederationScheduler.get().scheduleAtFixedRate(managementTask.get(), updateRate, + updateRate, TimeUnit.MILLISECONDS); if (logger.isDebugEnabled()) { logger.debug("Management Region created with Name : {}", @@ -206,8 +208,9 @@ public class LocalManager extends Manager { private void shutdownTasks() { // No need of pooledGIIExecutor as this node wont do GII again // so better to release resources - if (singleThreadFederationScheduler != null) { - singleThreadFederationScheduler.shutdownNow(); + ScheduledExecutorService executor = singleThreadFederationScheduler.get(); + if (executor != null) { + executor.shutdownNow(); } } @@ -246,7 +249,7 @@ public class LocalManager extends Manager { */ @VisibleForTesting public ScheduledExecutorService getFederationScheduler() { - return singleThreadFederationScheduler; + return singleThreadFederationScheduler.get(); } /** @@ -255,7 +258,7 @@ public class LocalManager extends Manager { */ @VisibleForTesting public void runManagementTaskAdhoc() { - managementTask.run(); + managementTask.get().run(); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java index 7a48465..2adcb47 100755 --- a/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/Manager.java @@ -28,21 +28,11 @@ import org.apache.geode.internal.statistics.StatisticsClock; * * @since GemFire 7.0 */ -public abstract class Manager { +public abstract class Manager implements ManagerLifecycle { protected final InternalCacheForClientAccess cache; /** - * depicts whether this node is a Managing node or not - */ - protected volatile boolean running; - - /** - * depicts whether this node is a Managing node or not - */ - protected volatile boolean stopCacheOps; - - /** * This is a single window to manipulate region resources for management */ protected final ManagementResourceRepo repo; @@ -56,8 +46,15 @@ public abstract class Manager { protected final StatisticsClock statisticsClock; - public Manager(ManagementResourceRepo repo, InternalDistributedSystem system, - InternalCache cache, StatisticsFactory statisticsFactory, StatisticsClock statisticsClock) { + /** + * True if this node is a Geode JMX manager. + */ + protected volatile boolean running; + + protected volatile boolean stopCacheOps; + + Manager(ManagementResourceRepo repo, InternalDistributedSystem system, InternalCache cache, + StatisticsFactory statisticsFactory, StatisticsClock statisticsClock) { this.repo = repo; this.cache = cache.getCacheForProcessingClientRequests(); this.system = system; @@ -65,12 +62,6 @@ public abstract class Manager { this.statisticsClock = statisticsClock; } - public abstract boolean isRunning(); - - public abstract void startManager(); - - public abstract void stopManager(); - @VisibleForTesting public ManagementResourceRepo getManagementResourceRepo() { return repo; diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java new file mode 100644 index 0000000..0ae8382 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerLifecycle.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal; + +/** + * Lifecycle operations for Geode JMX managers. + */ +public interface ManagerLifecycle { + + /** + * Start the manager. + */ + void startManager(); + + /** + * Stop the manager. + */ + void stopManager(); + + /** + * Returns true if the manager is running. + */ + boolean isRunning(); +} diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java new file mode 100644 index 0000000..845ad55 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/management/internal/ManagerMembership.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.management.internal; + +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; + +/** + * Membership operations for Geode JMX managers. + */ +public interface ManagerMembership { + + /** + * This method will be invoked from MembershipListener which is registered when the member becomes + * a Management node. + */ + void addMember(InternalDistributedMember member); + + /** + * This method will be invoked from MembershipListener which is registered when the member becomes + * a Management node. + */ + void removeMember(DistributedMember member, boolean crashed); + + /** + * This method will be invoked from MembershipListener which is registered when the member becomes + * a Management node. + */ + void suspectMember(DistributedMember member, InternalDistributedMember whoSuspected, + String reason); +} diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java index 27588c2..10a2f75 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/DistributedSystemBridge.java @@ -23,6 +23,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; @@ -283,6 +284,8 @@ public class DistributedSystemBridge { } if (mapOfMembers != null) { + Objects.requireNonNull(objectName); + Objects.requireNonNull(proxy); mapOfMembers.put(objectName, proxy); memberSetSize = mapOfMembers.values().size(); diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java index 2de1c51..3fda782 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/FederatingManagerTest.java @@ -14,23 +14,37 @@ */ package org.apache.geode.management.internal; +import static java.util.Collections.singleton; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.catchThrowable; +import static org.mockito.ArgumentCaptor.forClass; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.powermock.api.mockito.PowerMockito.when; +import static org.mockito.Mockito.when; import java.net.InetAddress; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.function.Supplier; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ErrorCollector; import org.mockito.ArgumentCaptor; import org.apache.geode.StatisticsFactory; @@ -48,17 +62,17 @@ import org.apache.geode.internal.cache.InternalCacheForClientAccess; import org.apache.geode.internal.cache.InternalRegionFactory; import org.apache.geode.internal.statistics.StatisticsClock; import org.apache.geode.management.DistributedSystemMXBean; +import org.apache.geode.management.ManagementException; import org.apache.geode.test.junit.categories.JMXTest; +import org.apache.geode.test.junit.rules.ExecutorServiceRule; @Category(JMXTest.class) public class FederatingManagerTest { private InternalCache cache; - private InternalCacheForClientAccess cacheForClientAccess; private ExecutorService executorService; - private MBeanJMXAdapter jmxAdapter; - private MBeanProxyFactory proxyFactory; private MemberMessenger messenger; + private MBeanProxyFactory proxyFactory; private ManagementResourceRepo repo; private SystemManagementService service; private StatisticsFactory statisticsFactory; @@ -67,12 +81,15 @@ public class FederatingManagerTest { private InternalRegionFactory regionFactory1; private InternalRegionFactory regionFactory2; + @Rule + public ErrorCollector errorCollector = new ErrorCollector(); + @Rule + public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); + @Before - public void setUp() throws Exception { + public void setUp() { cache = mock(InternalCache.class); - cacheForClientAccess = mock(InternalCacheForClientAccess.class); executorService = mock(ExecutorService.class); - jmxAdapter = mock(MBeanJMXAdapter.class); messenger = mock(MemberMessenger.class); proxyFactory = mock(MBeanProxyFactory.class); repo = mock(ManagementResourceRepo.class); @@ -83,14 +100,19 @@ public class FederatingManagerTest { regionFactory1 = mock(InternalRegionFactory.class); regionFactory2 = mock(InternalRegionFactory.class); + InternalCacheForClientAccess cacheForClientAccess = mock(InternalCacheForClientAccess.class); DistributedSystemMXBean distributedSystemMXBean = mock(DistributedSystemMXBean.class); + MBeanJMXAdapter jmxAdapter = mock(MBeanJMXAdapter.class); when(cache.getCacheForProcessingClientRequests()) .thenReturn(cacheForClientAccess); - when(cacheForClientAccess.createInternalRegionFactory()).thenReturn(regionFactory1) + when(cacheForClientAccess.createInternalRegionFactory()) + .thenReturn(regionFactory1) .thenReturn(regionFactory2); - when(regionFactory1.create(any())).thenReturn(mock(Region.class)); - when(regionFactory2.create(any())).thenReturn(mock(Region.class)); + when(regionFactory1.create(any())) + .thenReturn(mock(Region.class)); + when(regionFactory2.create(any())) + .thenReturn(mock(Region.class)); when(distributedSystemMXBean.getAlertLevel()) .thenReturn(AlertLevel.WARNING.name()); when(jmxAdapter.getDistributedSystemMXBean()) @@ -100,9 +122,10 @@ public class FederatingManagerTest { } @Test - public void addMemberArtifactsCreatesMonitoringRegion() throws Exception { - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + public void addMemberArtifactsCreatesMonitoringRegion() { + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.startManager(); federatingManager.addMemberArtifacts(member(1, 20)); @@ -111,23 +134,24 @@ public class FederatingManagerTest { } @Test - public void addMemberArtifactsCreatesMonitoringRegionWithHasOwnStats() throws Exception { - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + public void addMemberArtifactsCreatesMonitoringRegionWithHasOwnStats() { + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.startManager(); federatingManager.addMemberArtifacts(member(2, 40)); - ArgumentCaptor<HasCachePerfStats> captor = - ArgumentCaptor.forClass(HasCachePerfStats.class); + ArgumentCaptor<HasCachePerfStats> captor = forClass(HasCachePerfStats.class); verify(regionFactory1).setCachePerfStatsHolder(captor.capture()); assertThat(captor.getValue().hasOwnStats()).isTrue(); } @Test public void addMemberArtifactsCreatesNotificationRegion() { - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.startManager(); federatingManager.addMemberArtifacts(member(3, 60)); @@ -137,14 +161,14 @@ public class FederatingManagerTest { @Test public void addMemberArtifactsCreatesNotificationRegionWithHasOwnStats() { - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.startManager(); federatingManager.addMemberArtifacts(member(4, 80)); - ArgumentCaptor<HasCachePerfStats> captor = - ArgumentCaptor.forClass(HasCachePerfStats.class); + ArgumentCaptor<HasCachePerfStats> captor = forClass(HasCachePerfStats.class); verify(regionFactory2).setCachePerfStatsHolder(captor.capture()); assertThat(captor.getValue().hasOwnStats()).isTrue(); } @@ -159,13 +183,13 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(monitoringRegion) - .localDestroyRegion(); + verify(monitoringRegion).localDestroyRegion(); } @Test @@ -178,13 +202,13 @@ public class FederatingManagerTest { .thenReturn(notificationRegion); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(notificationRegion) - .localDestroyRegion(); + verify(notificationRegion).localDestroyRegion(); } @Test @@ -199,13 +223,13 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(monitoringRegion) - .localDestroyRegion(); + verify(monitoringRegion).localDestroyRegion(); } @Test @@ -220,13 +244,13 @@ public class FederatingManagerTest { .thenReturn(notificationRegion); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(notificationRegion) - .localDestroyRegion(); + verify(notificationRegion).localDestroyRegion(); } @Test @@ -242,13 +266,13 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(proxyFactory) - .removeAllProxies(member, monitoringRegion); + verify(proxyFactory).removeAllProxies(member, monitoringRegion); } @Test @@ -264,13 +288,13 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(proxyFactory) - .removeAllProxies(member, monitoringRegion); + verify(proxyFactory).removeAllProxies(member, monitoringRegion); } @Test @@ -285,13 +309,13 @@ public class FederatingManagerTest { .thenReturn(notificationRegion); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(notificationRegion) - .localDestroyRegion(); + verify(notificationRegion).localDestroyRegion(); } @Test @@ -306,13 +330,13 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(monitoringRegion) - .localDestroyRegion(); + verify(monitoringRegion).localDestroyRegion(); } @Test @@ -328,13 +352,13 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(proxyFactory) - .removeAllProxies(member, monitoringRegion); + verify(proxyFactory).removeAllProxies(member, monitoringRegion); } @Test @@ -349,13 +373,13 @@ public class FederatingManagerTest { .thenReturn(notificationRegion); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(notificationRegion) - .localDestroyRegion(); + verify(notificationRegion).localDestroyRegion(); } @Test @@ -370,13 +394,13 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); - verify(monitoringRegion) - .localDestroyRegion(); + verify(monitoringRegion).localDestroyRegion(); } @Test @@ -388,13 +412,13 @@ public class FederatingManagerTest { .thenReturn(null); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); Throwable thrown = catchThrowable(() -> federatingManager.removeMemberArtifacts(member, false)); - assertThat(thrown) - .isNull(); + assertThat(thrown).isNull(); } @Test @@ -406,22 +430,23 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); Throwable thrown = catchThrowable(() -> federatingManager.removeMemberArtifacts(member, false)); - assertThat(thrown) - .isNull(); + assertThat(thrown).isNull(); } @Test public void startManagerGetsNewExecutorServiceFromSupplier() { - @SuppressWarnings("unchecked") Supplier<ExecutorService> executorServiceSupplier = mock(Supplier.class); - when(executorServiceSupplier.get()).thenReturn(mock(ExecutorService.class)); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorServiceSupplier); + when(executorServiceSupplier.get()) + .thenReturn(mock(ExecutorService.class)); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorServiceSupplier); federatingManager.startManager(); @@ -437,23 +462,171 @@ public class FederatingManagerTest { .thenReturn(mock(Region.class)); when(system.getDistributedMember()) .thenReturn(member); - FederatingManager federatingManager = new FederatingManager(repo, system, service, cache, - statisticsFactory, statisticsClock, proxyFactory, messenger, executorService); + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, + statisticsClock, proxyFactory, messenger, executorService); federatingManager.removeMemberArtifacts(member, false); verifyNoMoreInteractions(proxyFactory); } + @Test + public void removeMemberWaitsForStartManager() throws Exception { + CountDownLatch latch = new CountDownLatch(1); + CyclicBarrier barrier = new CyclicBarrier(2); + ExecutorService executorService = mock(ExecutorService.class); + List<Future<InternalDistributedMember>> futureTaskList = Collections.emptyList(); + + when(executorService.invokeAll(any())).thenAnswer(invocation -> { + awaitCyclicBarrier(barrier); + awaitCountDownLatch(latch); + return futureTaskList; + }); + + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); + + executorServiceRule.submit(() -> { + federatingManager.startManager(); + }); + + executorServiceRule.submit(() -> { + awaitCyclicBarrier(barrier); + federatingManager.removeMember(member(), true); + }); + + await().untilAsserted(() -> { + assertThat(federatingManager.pendingTasks()).hasSize(1); + }); + + latch.countDown(); + + await().untilAsserted(() -> { + assertThat(federatingManager.pendingTasks()).isEmpty(); + }); + } + + @Test + public void pendingTasksIsEmptyByDefault() { + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); + + assertThat(federatingManager.pendingTasks()).isEmpty(); + } + + @Test + public void restartDoesNotThrowIfOtherMembersExist() { + DistributionManager distributionManager = mock(DistributionManager.class); + when(distributionManager.getOtherDistributionManagerIds()) + .thenReturn(singleton(mock(InternalDistributedMember.class))); + when(system.getDistributionManager()) + .thenReturn(distributionManager); + + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, Executors::newSingleThreadExecutor); + + federatingManager.startManager(); + federatingManager.stopManager(); + + assertThatCode(federatingManager::startManager) + .doesNotThrowAnyException(); + } + + @Test + public void startManagerThrowsManagementExceptionWithNestedCauseOfFailure() { + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); + RuntimeException exception = new RuntimeException("startManager failed"); + doThrow(exception) + .when(messenger).broadcastManagerInfo(); + + Throwable thrown = catchThrowable(() -> federatingManager.startManager()); + + assertThat(thrown) + .isInstanceOf(ManagementException.class) + .hasCause(exception); + } + + @Test + public void pendingTasksIsClearIfStartManagerFails() { + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); + RuntimeException exception = new RuntimeException("startManager failed"); + doThrow(exception) + .when(messenger).broadcastManagerInfo(); + + Throwable thrown = catchThrowable(() -> federatingManager.startManager()); + assertThat(thrown).isNotNull(); + + assertThat(federatingManager.pendingTasks()).isEmpty(); + } + + @Test + public void startingIsFalseIfStartManagerFails() { + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); + RuntimeException exception = new RuntimeException("startManager failed"); + doThrow(exception) + .when(messenger).broadcastManagerInfo(); + + Throwable thrown = catchThrowable(() -> federatingManager.startManager()); + assertThat(thrown).isNotNull(); + + assertThat(federatingManager.isStarting()).isFalse(); + } + + @Test + public void runningIsFalseIfStartManagerFails() { + FederatingManager federatingManager = + new FederatingManager(repo, system, service, cache, statisticsFactory, statisticsClock, + proxyFactory, messenger, executorService); + RuntimeException exception = new RuntimeException("startManager failed"); + doThrow(exception) + .when(messenger).broadcastManagerInfo(); + + Throwable thrown = catchThrowable(() -> federatingManager.startManager()); + assertThat(thrown).isNotNull(); + + assertThat(federatingManager.isRunning()).isFalse(); + } + + private void awaitCyclicBarrier(CyclicBarrier barrier) { + try { + barrier.await(getTimeout().toMillis(), MILLISECONDS); + } catch (Exception e) { + errorCollector.addError(e); + throw new RuntimeException(e); + } + } + + private void awaitCountDownLatch(CountDownLatch latch) { + try { + latch.await(getTimeout().toMillis(), MILLISECONDS); + } catch (Exception e) { + errorCollector.addError(e); + throw new RuntimeException(e); + } + } + private InternalDistributedMember member() { return member(1, 1); } private InternalDistributedMember member(int viewId, int port) { InternalDistributedMember member = mock(InternalDistributedMember.class); - when(member.getInetAddress()).thenReturn(mock(InetAddress.class)); - when(member.getVmViewId()).thenReturn(viewId); - when(member.getMembershipPort()).thenReturn(port); + when(member.getInetAddress()) + .thenReturn(mock(InetAddress.class)); + when(member.getVmViewId()) + .thenReturn(viewId); + when(member.getMembershipPort()) + .thenReturn(port); return member; } }