http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3b5664c/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java index a3c8b27..2c7ae07 100644 --- a/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/ClientHealthStatsDUnitTest.java @@ -14,13 +14,25 @@ */ package org.apache.geode.management; +import static java.util.concurrent.TimeUnit.*; import static org.apache.geode.distributed.ConfigurationProperties.*; -import static org.apache.geode.test.dunit.Assert.*; +import static org.apache.geode.test.dunit.Host.*; +import static org.apache.geode.test.dunit.IgnoredException.*; +import static org.apache.geode.test.dunit.Invoke.*; +import static org.apache.geode.test.dunit.NetworkUtils.*; +import static org.assertj.core.api.Assertions.*; +import java.io.Serializable; import java.util.Collection; -import java.util.Iterator; import java.util.Properties; +import javax.management.ObjectName; + +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.core.ConditionFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -29,22 +41,17 @@ import org.apache.geode.cache.EntryEvent; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionFactory; import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ClientRegionFactory; import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; -import org.apache.geode.internal.i18n.LocalizedStrings; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.IgnoredException; +import org.apache.geode.management.internal.SystemManagementService; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; import org.apache.geode.test.junit.categories.FlakyTest; @@ -52,415 +59,343 @@ import org.apache.geode.test.junit.categories.FlakyTest; * Client health stats check */ @Category(DistributedTest.class) -@SuppressWarnings("serial") -public class ClientHealthStatsDUnitTest extends JUnit4DistributedTestCase { - - private static final String k1 = "k1"; - private static final String k2 = "k2"; - private static final String client_k1 = "client-k1"; - private static final String client_k2 = "client-k2"; +@SuppressWarnings({ "serial", "unused" }) +public class ClientHealthStatsDUnitTest implements Serializable { - /** name of the test region */ - private static final String REGION_NAME = "ClientHealthStatsDUnitTest_Region"; + private static final int NUMBER_PUTS = 100; - private static VM client = null; - private static VM client2 = null; - private static VM managingNode = null; + private static final String KEY1 = "KEY1"; + private static final String KEY2 = "KEY2"; + private static final String VALUE1 = "VALUE1"; + private static final String VALUE2 = "VALUE2"; - private static ManagementTestBase helper = new ManagementTestBase() {}; + private static final String REGION_NAME = ClientHealthStatsDUnitTest.class.getSimpleName() + "_Region"; - private static int numOfCreates = 0; - private static int numOfUpdates = 0; - private static int numOfInvalidates = 0; - private static boolean lastKeyReceived = false; + // client1VM and client2VM VM fields + private static ClientCache clientCache; - private static GemFireCacheImpl cache = null; + // TODO: assert following values in each client VM + private static int numOfCreates; + private static int numOfUpdates; + private static int numOfInvalidates; + private static boolean lastKeyReceived; - private VM server = null; + private VM managerVM; + private VM serverVM; + private VM client1VM; + private VM client2VM; - @Override - public final void postSetUp() throws Exception { - disconnectAllFromDS(); + private String hostName; - final Host host = Host.getHost(0); - managingNode = host.getVM(0); - server = host.getVM(1); - client = host.getVM(2); - client2 = host.getVM(3); + @Rule + public ManagementTestRule managementTestRule = ManagementTestRule.builder().build(); - IgnoredException.addIgnoredException("Connection reset"); - } + @Before + public void before() throws Exception { + this.hostName = getServerHostName(getHost(0)); - @Override - public final void preTearDown() throws Exception { - reset(); - helper.closeCache(managingNode); - helper.closeCache(client); - helper.closeCache(client2); - helper.closeCache(server); + this.managerVM = getHost(0).getVM(0); + this.serverVM = getHost(0).getVM(1); + this.client1VM = getHost(0).getVM(2); + this.client2VM = getHost(0).getVM(3); - disconnectAllFromDS(); + addIgnoredException("Connection reset"); } - private static void reset() throws Exception { - lastKeyReceived = false; - numOfCreates = 0; - numOfUpdates = 0; - numOfInvalidates = 0; + @After + public void after() throws Exception { + invokeInEveryVM(() -> { + lastKeyReceived = false; + numOfCreates = 0; + numOfUpdates = 0; + numOfInvalidates = 0; + clientCache = null; + }); } @Test public void testClientHealthStats_SubscriptionEnabled() throws Exception { - helper.createManagementCache(managingNode); - helper.startManagingNode(managingNode); + this.managementTestRule.createManager(this.managerVM, false); + this.managementTestRule.startManager(this.managerVM); - int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache()); + int port = this.serverVM.invoke(() -> createServerCache()); - DistributedMember serverMember = helper.getMember(server); + this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true)); + this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, true)); - client.invoke( - () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false)); + this.client1VM.invoke(() -> put()); + this.client2VM.invoke(() -> put()); - client2.invoke( - () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2, true, false)); + DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM); + this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2)); - client.invoke(() -> ClientHealthStatsDUnitTest.put()); - client2.invoke(() -> ClientHealthStatsDUnitTest.put()); - - managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 2)); - helper.stopManagingNode(managingNode); + this.managementTestRule.stopManager(this.managerVM); } @Test public void testClientHealthStats_SubscriptionDisabled() throws Exception { - helper.createManagementCache(managingNode); - helper.startManagingNode(managingNode); - - int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache()); - - DistributedMember serverMember = helper.getMember(server); + this.managementTestRule.createManager(this.managerVM, false); + this.managementTestRule.startManager(this.managerVM); - client.invoke(() -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, - false, false)); + int port = this.serverVM.invoke(() -> createServerCache()); - client2.invoke(() -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2, - false, false)); + this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, false)); + this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, false)); - client.invoke(() -> ClientHealthStatsDUnitTest.put()); - client2.invoke(() -> ClientHealthStatsDUnitTest.put()); + this.client1VM.invoke(() -> put()); + this.client2VM.invoke(() -> put()); - managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 0)); - helper.stopManagingNode(managingNode); + DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM); + this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 0)); + this.managementTestRule.stopManager(this.managerVM); } @Test public void testClientHealthStats_DurableClient() throws Exception { - helper.createManagementCache(managingNode); - helper.startManagingNode(managingNode); + this.managementTestRule.createManager(this.managerVM, false); + this.managementTestRule.startManager(this.managerVM); - int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache()); + int port = this.serverVM.invoke(() -> createServerCache()); - DistributedMember serverMember = helper.getMember(server); + this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true)); + this.client2VM.invoke(() -> createClientCache(this.hostName, port, 2, true)); - client.invoke( - () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, true)); + this.client1VM.invoke(() -> put()); + this.client2VM.invoke(() -> put()); - client2.invoke( - () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 2, true, true)); + this.client1VM.invoke(() -> clientCache.close(true)); + this.client2VM.invoke(() -> clientCache.close(true)); - client.invoke(() -> ClientHealthStatsDUnitTest.put()); - client2.invoke(() -> ClientHealthStatsDUnitTest.put()); - - client.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache()); - - client2.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache()); - - managingNode.invoke(() -> ClientHealthStatsDUnitTest.verifyClientStats(serverMember, port, 2)); - helper.stopManagingNode(managingNode); + DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM); + this.managerVM.invoke(() -> verifyClientStats(serverMember, port, 2)); + this.managementTestRule.stopManager(this.managerVM); } - @Category(FlakyTest.class) // GEODE-337 @Test public void testStatsMatchWithSize() throws Exception { - // start a server - int port = (Integer) server.invoke(() -> ClientHealthStatsDUnitTest.createServerCache()); - // create durable client, with durable RI - client.invoke( - () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false)); - // do puts on server from three different threads, pause after 500 puts each. - server.invoke(() -> ClientHealthStatsDUnitTest.doPuts()); - // close durable client - client.invoke(() -> ClientHealthStatsDUnitTest.closeClientCache()); - - server.invoke("verifyProxyHasBeenPaused", () -> verifyProxyHasBeenPaused()); - // resume puts on server, add another 100. - server.invokeAsync(() -> ClientHealthStatsDUnitTest.resumePuts()); - // start durable client - client.invoke( - () -> ClientHealthStatsDUnitTest.createClientCache(server.getHost(), port, 1, true, false)); - // wait for full queue dispatch - client.invoke(() -> ClientHealthStatsDUnitTest.waitForLastKey()); - // verify the stats - server.invoke(() -> ClientHealthStatsDUnitTest.verifyStats(port)); - } + int port = this.serverVM.invoke(() -> createServerCache()); // start a serverVM - private static void verifyProxyHasBeenPaused() { + this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true)); // create durable client1VM, with durable RI - WaitCriterion criterion = new WaitCriterion() { + this.serverVM.invoke(() -> doPuts()); // do puts on serverVM from three different threads, pause after 500 puts each. - @Override - public boolean done() { - CacheClientNotifier ccn = CacheClientNotifier.getInstance(); - Collection<CacheClientProxy> ccProxies = ccn.getClientProxies(); + this.client1VM.invoke(() -> clientCache.close(true)); // close durable client1VM - Iterator<CacheClientProxy> itr = ccProxies.iterator(); + this.serverVM.invoke(() -> await().atMost(2, MINUTES).until(() -> cacheClientProxyHasBeenPause())); - while (itr.hasNext()) { - CacheClientProxy ccp = itr.next(); - System.out.println("proxy status " + ccp.getState()); - if (ccp.isPaused()) - return true; - } - return false; - } + this.serverVM.invoke(() -> resumePuts()); // resume puts on serverVM, add another 100. - @Override - public String description() { - return "Proxy has not paused yet"; - } - }; + this.client1VM.invoke(() -> createClientCache(this.hostName, port, 1, true)); // start durable client1VM - Wait.waitForCriterion(criterion, 15 * 1000, 200, true); + this.client1VM.invoke(() -> await().atMost(1, MINUTES).until(() -> lastKeyReceived)); // wait for full queue dispatch + + this.serverVM.invoke(() -> verifyStats(port)); // verify the stats } - private static int createServerCache() throws Exception { - Cache cache = helper.createCache(false); + /** + * Invoked in serverVM + */ + private boolean cacheClientProxyHasBeenPause() { + CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance(); // TODO + //CacheClientNotifier clientNotifier = ((CacheServerImpl)this.managementTestRule.getCache().getCacheServers().get(0)).getAcceptor().getCacheClientNotifier(); - RegionFactory<String, String> rf = cache.createRegionFactory(RegionShortcut.REPLICATE); - rf.setConcurrencyChecksEnabled(false); - rf.create(REGION_NAME); + Collection<CacheClientProxy> clientProxies = clientNotifier.getClientProxies(); - CacheServer server1 = cache.addCacheServer(); - server1.setPort(0); - server1.start(); - return server1.getPort(); + for (CacheClientProxy clientProxy: clientProxies) { + if (clientProxy.isPaused()) { + return true; + } + } + return false; } - private static void closeClientCache() throws Exception { - cache.close(true); + /** + * Invoked in serverVM + */ + private int createServerCache() throws Exception { + Cache cache = this.managementTestRule.getCache(); + + RegionFactory<String, String> regionFactory = cache.createRegionFactory(RegionShortcut.REPLICATE); + regionFactory.setConcurrencyChecksEnabled(false); + regionFactory.create(REGION_NAME); + + CacheServer cacheServer = cache.addCacheServer(); + cacheServer.setPort(0); + cacheServer.start(); + return cacheServer.getPort(); } - private static void createClientCache(Host host, Integer port, int clientNum, - boolean subscriptionEnabled, boolean durable) throws Exception { + /** + * Invoked in client1VM and client2VM + */ + private void createClientCache(final String hostName, + final Integer port, + final int clientNum, + final boolean subscriptionEnabled) throws Exception { Properties props = new Properties(); - props.setProperty(DURABLE_CLIENT_ID, "durable-" + clientNum); - props.setProperty(DURABLE_CLIENT_TIMEOUT, "300000"); - props.setProperty(LOG_LEVEL, "info"); - props.setProperty(STATISTIC_ARCHIVE_FILE, - getTestMethodName() + "_client_" + clientNum + ".gfs"); props.setProperty(STATISTIC_SAMPLING_ENABLED, "true"); - ClientCacheFactory ccf = new ClientCacheFactory(props); + ClientCacheFactory cacheFactory = new ClientCacheFactory(props); if (subscriptionEnabled) { - ccf.setPoolSubscriptionEnabled(true); - ccf.setPoolSubscriptionAckInterval(50); - ccf.setPoolSubscriptionRedundancy(0); + cacheFactory.setPoolSubscriptionEnabled(true); + cacheFactory.setPoolSubscriptionAckInterval(50); + cacheFactory.setPoolSubscriptionRedundancy(0); } - if (durable) { - ccf.set(DURABLE_CLIENT_ID, "DurableClientId_" + clientNum); - ccf.set(DURABLE_CLIENT_TIMEOUT, "" + 300); - } + cacheFactory.set(DURABLE_CLIENT_ID, "DurableClientId_" + clientNum); + cacheFactory.set(DURABLE_CLIENT_TIMEOUT, "" + 30000); - ccf.addPoolServer(host.getHostName(), port); - cache = (GemFireCacheImpl) ccf.create(); + cacheFactory.addPoolServer(hostName, port); + clientCache = cacheFactory.create(); - ClientRegionFactory<String, String> crf = - cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY); - crf.setConcurrencyChecksEnabled(false); + ClientRegionFactory<String, String> regionFactory = clientCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY); + regionFactory.setConcurrencyChecksEnabled(false); - crf.addCacheListener(new CacheListenerAdapter<String, String>() { - public void afterInvalidate(EntryEvent<String, String> event) { - cache.getLoggerI18n() - .fine("Invalidate Event: " + event.getKey() + ", " + event.getNewValue()); + regionFactory.addCacheListener(new CacheListenerAdapter<String, String>() { + @Override + public void afterInvalidate(final EntryEvent<String, String> event) { numOfInvalidates++; } - public void afterCreate(EntryEvent<String, String> event) { - if (((String) event.getKey()).equals("last_key")) { + @Override + public void afterCreate(final EntryEvent<String, String> event) { + if ("last_key".equals(event.getKey())) { lastKeyReceived = true; } - cache.getLoggerI18n().fine("Create Event: " + event.getKey() + ", " + event.getNewValue()); numOfCreates++; } - public void afterUpdate(EntryEvent<String, String> event) { - cache.getLoggerI18n().fine("Update Event: " + event.getKey() + ", " + event.getNewValue()); + @Override + public void afterUpdate(final EntryEvent<String, String> event) { numOfUpdates++; } }); - Region<String, String> r = crf.create(REGION_NAME); + Region<String, String> region = regionFactory.create(REGION_NAME); if (subscriptionEnabled) { - r.registerInterest("ALL_KEYS", true); - cache.readyForEvents(); + region.registerInterest("ALL_KEYS", true); + clientCache.readyForEvents(); } } - private static void doPuts() throws Exception { - Cache cache = GemFireCacheImpl.getInstance(); - final Region<String, String> r = cache.getRegion(Region.SEPARATOR + REGION_NAME); - Thread t1 = new Thread(new Runnable() { - public void run() { - for (int i = 0; i < 500; i++) { - r.put("T1_KEY_" + i, "VALUE_" + i); - } + /** + * Invoked in serverVM + */ + private void doPuts() throws Exception { + Cache cache = this.managementTestRule.getCache(); + Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME); + + Thread thread1 = new Thread(() -> { + for (int i = 0; i < NUMBER_PUTS; i++) { + region.put("T1_KEY_" + i, "VALUE_" + i); } }); - Thread t2 = new Thread(new Runnable() { - public void run() { - for (int i = 0; i < 500; i++) { - r.put("T2_KEY_" + i, "VALUE_" + i); - } + Thread thread2 = new Thread(() -> { + for (int i = 0; i < NUMBER_PUTS; i++) { + region.put("T2_KEY_" + i, "VALUE_" + i); } }); - Thread t3 = new Thread(new Runnable() { - public void run() { - for (int i = 0; i < 500; i++) { - r.put("T3_KEY_" + i, "VALUE_" + i); - } + Thread thread3 = new Thread(() -> { + for (int i = 0; i < NUMBER_PUTS; i++) { + region.put("T3_KEY_" + i, "VALUE_" + i); } }); - t1.start(); - t2.start(); - t3.start(); + thread1.start(); + thread2.start(); + thread3.start(); - t1.join(); - t2.join(); - t3.join(); + thread1.join(); + thread2.join(); + thread3.join(); } - private static void resumePuts() { - Cache cache = GemFireCacheImpl.getInstance(); - Region<String, String> r = cache.getRegion(Region.SEPARATOR + REGION_NAME); - for (int i = 0; i < 100; i++) { - r.put("NEWKEY_" + i, "NEWVALUE_" + i); + /** + * Invoked in serverVM + */ + private void resumePuts() { + Cache cache = this.managementTestRule.getCache(); + Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME); + + for (int i = 0; i < NUMBER_PUTS; i++) { + region.put("NEWKEY_" + i, "NEWVALUE_" + i); } - r.put("last_key", "last_value"); + region.put("last_key", "last_value"); } - private static void waitForLastKey() { - WaitCriterion wc = new WaitCriterion() { - @Override - public boolean done() { - return lastKeyReceived; - } + /** + * Invoked in managerVM + */ + private void verifyClientStats(final DistributedMember serverMember, final int serverPort, final int numSubscriptions) throws Exception { + ManagementService service = this.managementTestRule.getManagementService(); + CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember, serverPort); - @Override - public String description() { - return "Did not receive last key."; - } - }; - Wait.waitForCriterion(wc, 60 * 1000, 500, true); - } + String[] clientIds = cacheServerMXBean.getClientIds(); + assertThat(clientIds).hasSize(2); - private static DistributedMember getMember() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - return cache.getDistributedSystem().getDistributedMember(); - } + ClientHealthStatus[] clientStatuses = cacheServerMXBean.showAllClientStats(); - private static void verifyClientStats(DistributedMember serverMember, int serverPort, - int numSubscriptions) { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - try { - ManagementService service = ManagementService.getExistingManagementService(cache); - CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort); - - String[] clientIds = bean.getClientIds(); - assertTrue(clientIds.length == 2); - System.out.println( - "<ExpectedString> ClientId-1 of the Server is " + clientIds[0] + "</ExpectedString> "); - System.out.println( - "<ExpectedString> ClientId-2 of the Server is " + clientIds[1] + "</ExpectedString> "); - - ClientHealthStatus[] clientStatuses = bean.showAllClientStats(); - - ClientHealthStatus clientStatus1 = bean.showClientStats(clientIds[0]); - ClientHealthStatus clientStatus2 = bean.showClientStats(clientIds[1]); - assertNotNull(clientStatus1); - assertNotNull(clientStatus2); - System.out.println("<ExpectedString> ClientStats-1 of the Server is " + clientStatus1 - + "</ExpectedString> "); - System.out.println("<ExpectedString> ClientStats-2 of the Server is " + clientStatus2 - + "</ExpectedString> "); - - System.out - .println("<ExpectedString> clientStatuses " + clientStatuses + "</ExpectedString> "); - assertNotNull(clientStatuses); - - assertTrue(clientStatuses.length == 2); - for (ClientHealthStatus status : clientStatuses) { - System.out.println( - "<ExpectedString> ClientStats of the Server is " + status + "</ExpectedString> "); - } + ClientHealthStatus clientStatus1 = cacheServerMXBean.showClientStats(clientIds[0]); + ClientHealthStatus clientStatus2 = cacheServerMXBean.showClientStats(clientIds[1]); + assertThat(clientStatus1).isNotNull(); + assertThat(clientStatus2).isNotNull(); - DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); - assertEquals(2, dsBean.getNumClients()); - assertEquals(numSubscriptions, dsBean.getNumSubscriptions()); + assertThat(clientStatuses).isNotNull().hasSize(2); - } catch (Exception e) { - fail("Error while verifying cache server from remote member", e); - } + DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); + assertThat(dsBean.getNumClients()).isEqualTo(2); + assertThat(dsBean.getNumSubscriptions()).isEqualTo(numSubscriptions); } - private static void put() { - Cache cache = GemFireCacheImpl.getInstance(); - Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); - assertNotNull(r1); - - r1.put(k1, client_k1); - assertEquals(r1.getEntry(k1).getValue(), client_k1); - r1.put(k2, client_k2); - assertEquals(r1.getEntry(k2).getValue(), client_k2); - try { - Thread.sleep(10000); - } catch (Exception e) { - // sleep - } - r1.clear(); - - r1.put(k1, client_k1); - assertEquals(r1.getEntry(k1).getValue(), client_k1); - r1.put(k2, client_k2); - assertEquals(r1.getEntry(k2).getValue(), client_k2); - r1.clear(); - try { - Thread.sleep(10000); - } catch (Exception e) { - // sleep - } + /** + * Invoked in client1VM and client2VM + */ + private void put() throws Exception { + Cache cache = (Cache)clientCache; + Region<String, String> region = cache.getRegion(Region.SEPARATOR + REGION_NAME); + + region.put(KEY1, VALUE1); + assertThat(region.getEntry(KEY1).getValue()).isEqualTo(VALUE1); + + region.put(KEY2, VALUE2); + assertThat(region.getEntry(KEY2).getValue()).isEqualTo(VALUE2); + + region.clear(); + + region.put(KEY1, VALUE1); + assertThat(region.getEntry(KEY1).getValue()).isEqualTo(VALUE1); + + region.put(KEY2, VALUE2); + assertThat(region.getEntry(KEY2).getValue()).isEqualTo(VALUE2); + + region.clear(); } - private static void verifyStats(int serverPort) throws Exception { - Cache cache = GemFireCacheImpl.getInstance(); - ManagementService service = ManagementService.getExistingManagementService(cache); + /** + * Invoked in serverVM + */ + private void verifyStats(final int serverPort) throws Exception { + ManagementService service = this.managementTestRule.getManagementService(); CacheServerMXBean serverBean = service.getLocalCacheServerMXBean(serverPort); - CacheClientNotifier ccn = CacheClientNotifier.getInstance(); - CacheClientProxy ccp = ccn.getClientProxies().iterator().next(); - cache.getLoggerI18n().info(LocalizedStrings.DEBUG, "getQueueSize() " + ccp.getQueueSize()); - cache.getLoggerI18n().info(LocalizedStrings.DEBUG, - "getQueueSizeStat() " + ccp.getQueueSizeStat()); - cache.getLoggerI18n().info(LocalizedStrings.DEBUG, - "getEventsEnqued() " + ccp.getHARegionQueue().getStatistics().getEventsEnqued()); - cache.getLoggerI18n().info(LocalizedStrings.DEBUG, - "getEventsDispatched() " + ccp.getHARegionQueue().getStatistics().getEventsDispatched()); - cache.getLoggerI18n().info(LocalizedStrings.DEBUG, - "getEventsRemoved() " + ccp.getHARegionQueue().getStatistics().getEventsRemoved()); - cache.getLoggerI18n().info(LocalizedStrings.DEBUG, - "getNumVoidRemovals() " + ccp.getHARegionQueue().getStatistics().getNumVoidRemovals()); - assertEquals(ccp.getQueueSize(), ccp.getQueueSizeStat()); + + CacheClientNotifier clientNotifier = CacheClientNotifier.getInstance(); + CacheClientProxy clientProxy = clientNotifier.getClientProxies().iterator().next(); + assertThat(clientProxy.getQueueSizeStat()).isEqualTo(clientProxy.getQueueSize()); + ClientQueueDetail queueDetails = serverBean.showClientQueueDetails()[0]; - assertEquals(queueDetails.getQueueSize(), ccp.getQueueSizeStat()); + assertThat(clientProxy.getQueueSizeStat()).isEqualTo((int)queueDetails.getQueueSize()); + } + + private CacheServerMXBean awaitCacheServerMXBean(final DistributedMember serverMember, final int port) { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + ObjectName objectName = service.getCacheServerMBeanName(port, serverMember); + + await().until(() -> assertThat(service.getMBeanProxy(objectName, CacheServerMXBean.class)).isNotNull()); + + return service.getMBeanProxy(objectName, CacheServerMXBean.class); + } + + private ConditionFactory await() { + return Awaitility.await().atMost(2, MINUTES); } }
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3b5664c/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java index 9c33003..3176bda 100644 --- a/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/CompositeTypeTestDUnitTest.java @@ -14,163 +14,86 @@ */ package org.apache.geode.management; -import org.junit.experimental.categories.Category; -import org.junit.Test; +import static java.util.concurrent.TimeUnit.*; +import static org.assertj.core.api.Assertions.*; -import static org.junit.Assert.*; +import java.io.Serializable; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.FlakyTest; - -import javax.management.MalformedObjectNameException; import javax.management.ObjectName; -import org.apache.geode.internal.cache.GemFireCacheImpl; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.core.ConditionFactory; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + import org.apache.geode.management.internal.MBeanJMXAdapter; -import org.apache.geode.management.internal.ManagementConstants; import org.apache.geode.management.internal.SystemManagementService; -import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; +import org.apache.geode.test.junit.categories.DistributedTest; @Category(DistributedTest.class) -public class CompositeTypeTestDUnitTest extends ManagementTestBase { +@SuppressWarnings({ "serial", "unused" }) +public class CompositeTypeTestDUnitTest implements Serializable { - public CompositeTypeTestDUnitTest() { - super(); - // TODO Auto-generated constructor stub - } + @Manager + private VM managerVM; - /** - * - */ - private static final long serialVersionUID = 1L; + @Member + private VM memberVM; - private static ObjectName objectName; + @Rule + public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(true).build(); - @Category(FlakyTest.class) // GEODE-1492 @Test - public void testCompositeTypeGetters() throws Exception { + public void testCompositeTypeGetters() throws Exception{ + registerMBeanWithCompositeTypeGetters(this.memberVM); - initManagement(false); - String member = getMemberId(managedNode1); - member = MBeanJMXAdapter.makeCompliantName(member); - - registerMBeanWithCompositeTypeGetters(managedNode1, member); + String memberName = MBeanJMXAdapter.makeCompliantName(getMemberId(this.memberVM)); + verifyMBeanWithCompositeTypeGetters(this.managerVM, memberName); + } + private void registerMBeanWithCompositeTypeGetters(final VM memberVM) throws Exception { + memberVM.invoke("registerMBeanWithCompositeTypeGetters", () -> { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); - checkMBeanWithCompositeTypeGetters(managingNode, member); + ObjectName objectName = new ObjectName("GemFire:service=custom,type=composite"); + CompositeTestMXBean compositeTestMXBean = new CompositeTestMBean(); + objectName = service.registerMBean(compositeTestMXBean, objectName); + service.federate(objectName, CompositeTestMXBean.class, false); + }); } + private void verifyMBeanWithCompositeTypeGetters(final VM managerVM, final String memberId) throws Exception { + managerVM.invoke("verifyMBeanWithCompositeTypeGetters", () -> { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + ObjectName objectName = new ObjectName("GemFire:service=custom,type=composite,member=" + memberId); - /** - * Creates a Local region - * - * @param vm reference to VM - */ - protected void registerMBeanWithCompositeTypeGetters(VM vm, final String memberID) - throws Exception { - SerializableRunnable regMBean = - new SerializableRunnable("Register CustomMBean with composite Type") { - public void run() { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - SystemManagementService service = (SystemManagementService) getManagementService(); - - try { - ObjectName objectName = new ObjectName("GemFire:service=custom,type=composite"); - CompositeTestMXBean mbean = new CompositeTestMBean(); - objectName = service.registerMBean(mbean, objectName); - service.federate(objectName, CompositeTestMXBean.class, false); - } catch (MalformedObjectNameException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (NullPointerException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - - - } - }; - vm.invoke(regMBean); - } + await().until(() -> service.getMBeanInstance(objectName, CompositeTestMXBean.class) != null); + + CompositeTestMXBean compositeTestMXBean = service.getMBeanInstance(objectName, CompositeTestMXBean.class); + assertThat(compositeTestMXBean).isNotNull(); + CompositeStats listCompositeStatsData = compositeTestMXBean.listCompositeStats(); + assertThat(listCompositeStatsData).isNotNull(); - /** - * Creates a Local region - * - * @param vm reference to VM - */ - protected void checkMBeanWithCompositeTypeGetters(VM vm, final String memberID) throws Exception { - SerializableRunnable checkMBean = - new SerializableRunnable("Check CustomMBean with composite Type") { - public void run() { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - final SystemManagementService service = - (SystemManagementService) getManagementService(); - - try { - final ObjectName objectName = - new ObjectName("GemFire:service=custom,type=composite,member=" + memberID); - - Wait.waitForCriterion(new WaitCriterion() { - public String description() { - return "Waiting for Composite Type MBean"; - } - - public boolean done() { - CompositeTestMXBean bean = - service.getMBeanInstance(objectName, CompositeTestMXBean.class); - boolean done = (bean != null); - return done; - } - - }, ManagementConstants.REFRESH_TIME * 4, 500, true); - - - CompositeTestMXBean bean = - service.getMBeanInstance(objectName, CompositeTestMXBean.class); - - CompositeStats listData = bean.listCompositeStats(); - - System.out.println("connectionStatsType = " + listData.getConnectionStatsType()); - System.out.println("connectionsOpened = " + listData.getConnectionsOpened()); - System.out.println("connectionsClosed = " + listData.getConnectionsClosed()); - System.out.println("connectionsAttempted = " + listData.getConnectionsAttempted()); - System.out.println("connectionsFailed = " + listData.getConnectionsFailed()); - - CompositeStats getsData = bean.getCompositeStats(); - System.out.println("connectionStatsType = " + getsData.getConnectionStatsType()); - System.out.println("connectionsOpened = " + getsData.getConnectionsOpened()); - System.out.println("connectionsClosed = " + getsData.getConnectionsClosed()); - System.out.println("connectionsAttempted = " + getsData.getConnectionsAttempted()); - System.out.println("connectionsFailed = " + getsData.getConnectionsFailed()); - - CompositeStats[] arrayData = bean.getCompositeArray(); - Integer[] intArrayData = bean.getIntegerArray(); - Thread.sleep(2 * 60 * 1000); - } catch (MalformedObjectNameException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (NullPointerException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); - } - - - - } - }; - vm.invoke(checkMBean); + CompositeStats getCompositeStatsData = compositeTestMXBean.getCompositeStats(); + assertThat(getCompositeStatsData).isNotNull(); + + CompositeStats[] getCompositeArrayData = compositeTestMXBean.getCompositeArray(); + assertThat(getCompositeArrayData).isNotNull().isNotEmpty(); + + Integer[] getIntegerArrayData = compositeTestMXBean.getIntegerArray(); + assertThat(getIntegerArrayData).isNotNull().isNotEmpty(); + }); } + private String getMemberId(final VM memberVM) { + return memberVM.invoke("getMemberId", () -> this.managementTestRule.getDistributedMember().getId()); + } + private ConditionFactory await() { + return Awaitility.await().atMost(2, MINUTES); + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f3b5664c/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java index 86501c3..9c8e5c9 100644 --- a/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/DLockManagementDUnitTest.java @@ -14,452 +14,272 @@ */ package org.apache.geode.management; -import org.junit.experimental.categories.Category; -import org.junit.Test; - -import static org.junit.Assert.*; - -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; +import static java.util.concurrent.TimeUnit.*; +import static org.apache.geode.internal.process.ProcessUtils.*; +import static org.assertj.core.api.Assertions.*; +import java.io.Serializable; import java.util.Map; import java.util.Set; import javax.management.ObjectName; +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.core.ConditionFactory; +import org.junit.Rule; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.internal.InternalDistributedSystem; import org.apache.geode.distributed.internal.locks.DLockService; -import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.management.internal.MBeanJMXAdapter; import org.apache.geode.management.internal.SystemManagementService; -import org.apache.geode.test.dunit.Assert; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.junit.categories.FlakyTest; +import org.apache.geode.test.junit.categories.DistributedTest; @Category(DistributedTest.class) -public class DLockManagementDUnitTest extends ManagementTestBase { +@SuppressWarnings({ "serial", "unused" }) +public class DLockManagementDUnitTest implements Serializable { - private static final long serialVersionUID = 1L; + private static final int MAX_WAIT_MILLIS = 120 * 1000; // 2 MINUTES - private static final String LOCK_SERVICE_NAME = "testLockService"; + private static final String LOCK_SERVICE_NAME = DLockManagementDUnitTest.class.getSimpleName() + "_testLockService"; - // This must be bigger than the dunit ack-wait-threshold for the revoke - // tests. The command line is setting the ack-wait-threshold to be - // 60 seconds. - private static final int MAX_WAIT = 70 * 1000; + @Manager + private VM managerVM; - public DLockManagementDUnitTest() { - super(); + @Member + private VM[] memberVMs; - } + @Rule + public ManagementTestRule managementTestRule = ManagementTestRule.builder().managersFirst(false).start(true).build(); - /** - * Distributed Lock Service test - * - * @throws Exception - */ - @Category(FlakyTest.class) // GEODE-173: eats exceptions, HeadlessGFSH, time sensitive, - // waitForCriterions @Test - public void testDLockMBean() throws Throwable { + public void testLockServiceMXBean() throws Throwable { + createLockServiceGrantor(this.memberVMs[0]); + createLockService(this.memberVMs[1]); + createLockService(this.memberVMs[2]); - initManagement(false); - - VM[] managedNodes = new VM[getManagedNodeList().size()]; - VM managingNode = getManagingNode(); - - getManagedNodeList().toArray(managedNodes); - - createGrantorLockService(managedNodes[0]); - - createLockService(managedNodes[1]); - - createLockService(managedNodes[2]); - - for (VM vm : getManagedNodeList()) { - verifyLockData(vm); + for (VM memberVM : this.memberVMs) { + verifyLockServiceMXBeanInMember(memberVM); } - verifyLockDataRemote(managingNode); + verifyLockServiceMXBeanInManager(this.managerVM); - for (VM vm : getManagedNodeList()) { - closeLockService(vm); + for (VM memberVM : this.memberVMs) { + closeLockService(memberVM); } } - /** - * Distributed Lock Service test - * - * @throws Exception - */ - @Category(FlakyTest.class) // GEODE-553: waitForCriterion, eats exceptions, HeadlessGFSH @Test - public void testDLockAggregate() throws Throwable { - initManagement(false); - VM[] managedNodes = new VM[getManagedNodeList().size()]; - VM managingNode = getManagingNode(); - - getManagedNodeList().toArray(managedNodes); - - createGrantorLockService(managedNodes[0]); + public void testDistributedLockServiceMXBean() throws Throwable { + createLockServiceGrantor(this.memberVMs[0]); + createLockService(this.memberVMs[1]); + createLockService(this.memberVMs[2]); - createLockService(managedNodes[1]); + verifyDistributedLockServiceMXBean(this.managerVM, 3); - createLockService(managedNodes[2]); + DistributedMember member = this.managementTestRule.getDistributedMember(this.memberVMs[2]); + verifyFetchOperations(this.managerVM, member); - checkAggregate(managingNode, 3); - DistributedMember member = getMember(managedNodes[2]); - checkNavigation(managingNode, member); + createLockService(this.managerVM); + verifyDistributedLockServiceMXBean(this.managerVM, 4); - createLockService(managingNode); - checkAggregate(managingNode, 4); - - - for (VM vm : getManagedNodeList()) { - closeLockService(vm); + for (VM memberVM : this.memberVMs) { + closeLockService(memberVM); } - ensureProxyCleanup(managingNode); - checkAggregate(managingNode, 1); - closeLockService(managingNode); - checkAggregate(managingNode, 0); + verifyProxyCleanupInManager(this.managerVM); + verifyDistributedLockServiceMXBean(this.managerVM, 1); + closeLockService(this.managerVM); + verifyDistributedLockServiceMXBean(this.managerVM, 0); } - public void ensureProxyCleanup(final VM vm) { - - SerializableRunnable ensureProxyCleanup = new SerializableRunnable("Ensure Proxy cleanup") { - public void run() { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - Set<DistributedMember> otherMemberSet = - cache.getDistributionManager().getOtherNormalDistributionManagerIds(); - final SystemManagementService service = (SystemManagementService) getManagementService(); - - - for (final DistributedMember member : otherMemberSet) { - RegionMXBean bean = null; - try { - - Wait.waitForCriterion(new WaitCriterion() { - - LockServiceMXBean bean = null; - - public String description() { - return "Waiting for the proxy to get deleted at managing node"; - } - - public boolean done() { - ObjectName objectName = service.getRegionMBeanName(member, LOCK_SERVICE_NAME); - bean = service.getMBeanProxy(objectName, LockServiceMXBean.class); - boolean done = (bean == null); - return done; - } - - }, MAX_WAIT, 500, true); - - } catch (Exception e) { - throw new AssertionError("could not remove proxies in required time", e); - - } - assertNull(bean); - - } + private void verifyProxyCleanupInManager(final VM managerVM) { + managerVM.invoke("verifyProxyCleanupInManager", () -> { + Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + for (final DistributedMember member : otherMembers) { + ObjectName objectName = service.getRegionMBeanName(member, LOCK_SERVICE_NAME); + await().until(() -> assertThat(service.getMBeanProxy(objectName, LockServiceMXBean.class)).isNull()); } - }; - vm.invoke(ensureProxyCleanup); + }); } - /** - * Creates a grantor lock service - * - * @param vm - */ - @SuppressWarnings("serial") - protected void createGrantorLockService(final VM vm) { - SerializableRunnable createGrantorLockService = - new SerializableRunnable("Create Grantor LockService") { - public void run() { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - assertNull(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)); - - DLockService service = (DLockService) DistributedLockService.create(LOCK_SERVICE_NAME, - cache.getDistributedSystem()); + private void createLockServiceGrantor(final VM memberVM) { + memberVM.invoke("createLockServiceGrantor", () -> { + assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNull(); - assertSame(service, DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)); + DLockService lockService = (DLockService) DistributedLockService.create(LOCK_SERVICE_NAME, this.managementTestRule.getCache().getDistributedSystem()); + DistributedMember grantor = lockService.getLockGrantorId().getLockGrantorMember(); + assertThat(grantor).isNotNull(); - InternalDistributedMember grantor = service.getLockGrantorId().getLockGrantorMember(); + LockServiceMXBean lockServiceMXBean = awaitLockServiceMXBean(LOCK_SERVICE_NAME); - assertNotNull(grantor); + assertThat(lockServiceMXBean).isNotNull(); + assertThat(lockServiceMXBean.isDistributed()).isTrue(); + assertThat(lockServiceMXBean.getName()).isEqualTo(LOCK_SERVICE_NAME); + assertThat(lockServiceMXBean.isLockGrantor()).isTrue(); + assertThat(lockServiceMXBean.fetchGrantorMember()).isEqualTo(this.managementTestRule.getDistributedMember().getId()); + }); + } - LogWriterUtils.getLogWriter().info("In identifyLockGrantor - grantor is " + grantor); + private void createLockService(final VM anyVM) { + anyVM.invoke("createLockService", () -> { + assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNull(); + DistributedLockService.create(LOCK_SERVICE_NAME, this.managementTestRule.getCache().getDistributedSystem()); + LockServiceMXBean lockServiceMXBean = awaitLockServiceMXBean(LOCK_SERVICE_NAME); - ManagementService mgmtService = getManagementService(); + assertThat(lockServiceMXBean).isNotNull(); + assertThat(lockServiceMXBean.isDistributed()).isTrue(); + assertThat(lockServiceMXBean.isLockGrantor()).isFalse(); + }); + } - LockServiceMXBean bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME); + private void closeLockService(final VM anyVM) { + anyVM.invoke("closeLockService", () -> { + assertThat(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)).isNotNull(); + DistributedLockService.destroy(LOCK_SERVICE_NAME); - assertNotNull(bean); + awaitLockServiceMXBeanIsNull(LOCK_SERVICE_NAME); - assertTrue(bean.isDistributed()); + ManagementService service = this.managementTestRule.getManagementService(); + LockServiceMXBean lockServiceMXBean = service.getLocalLockServiceMBean(LOCK_SERVICE_NAME); + assertThat(lockServiceMXBean).isNull(); + }); + } - assertEquals(bean.getName(), LOCK_SERVICE_NAME); + private void verifyLockServiceMXBeanInMember(final VM memberVM) { + memberVM.invoke("verifyLockServiceMXBeanInManager", () -> { + DistributedLockService lockService = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME); + lockService.lock("lockObject_" + identifyPid(), MAX_WAIT_MILLIS, -1); - assertTrue(bean.isLockGrantor()); + ManagementService service = this.managementTestRule.getManagementService(); + LockServiceMXBean lockServiceMXBean = service.getLocalLockServiceMBean(LOCK_SERVICE_NAME); + assertThat(lockServiceMXBean).isNotNull(); - assertEquals(cache.getDistributedSystem().getMemberId(), bean.fetchGrantorMember()); + String[] listHeldLock = lockServiceMXBean.listHeldLocks(); + assertThat(listHeldLock).hasSize(1); - } - }; - vm.invoke(createGrantorLockService); + Map<String, String> lockThreadMap = lockServiceMXBean.listThreadsHoldingLock(); + assertThat(lockThreadMap).hasSize(1); + }); } /** - * Creates a named lock service - * - * @param vm + * Verify lock data from remote Managing node */ - @SuppressWarnings("serial") - protected void createLockService(final VM vm) { - SerializableRunnable createLockService = new SerializableRunnable("Create LockService") { - public void run() { - assertNull(DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)); - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - DistributedLockService service = - DistributedLockService.create(LOCK_SERVICE_NAME, cache.getDistributedSystem()); + private void verifyLockServiceMXBeanInManager(final VM managerVM) throws Exception { + managerVM.invoke("verifyLockServiceMXBeanInManager", () -> { + Set<DistributedMember> otherMembers = this.managementTestRule.getOtherNormalMembers(); - assertSame(service, DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME)); + for (DistributedMember member : otherMembers) { + LockServiceMXBean lockServiceMXBean = awaitLockServiceMXBeanProxy(member, LOCK_SERVICE_NAME); + assertThat(lockServiceMXBean).isNotNull(); + String[] listHeldLock = lockServiceMXBean.listHeldLocks(); + assertThat(listHeldLock).hasSize(1); + Map<String, String> lockThreadMap = lockServiceMXBean.listThreadsHoldingLock(); + assertThat(lockThreadMap).hasSize(1); + } + }); + } - ManagementService mgmtService = getManagementService(); - - LockServiceMXBean bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME); - - assertNotNull(bean); + private void verifyFetchOperations(final VM memberVM, final DistributedMember member) { + memberVM.invoke("verifyFetchOperations", () -> { + ManagementService service = this.managementTestRule.getManagementService(); - assertTrue(bean.isDistributed()); + DistributedSystemMXBean distributedSystemMXBean = awaitDistributedSystemMXBean(); + ObjectName distributedLockServiceMXBeanName = MBeanJMXAdapter.getDistributedLockServiceName(LOCK_SERVICE_NAME); + assertThat(distributedSystemMXBean.fetchDistributedLockServiceObjectName(LOCK_SERVICE_NAME)).isEqualTo(distributedLockServiceMXBeanName); - assertFalse(bean.isLockGrantor()); - } - }; - vm.invoke(createLockService); + ObjectName lockServiceMXBeanName = MBeanJMXAdapter.getLockServiceMBeanName(member.getId(), LOCK_SERVICE_NAME); + assertThat(distributedSystemMXBean.fetchLockServiceObjectName(member.getId(), LOCK_SERVICE_NAME)).isEqualTo(lockServiceMXBeanName); + }); } /** - * Closes a named lock service - * - * @param vm + * Verify Aggregate MBean */ - @SuppressWarnings("serial") - protected void closeLockService(final VM vm) { - SerializableRunnable closeLockService = new SerializableRunnable("Close LockService") { - public void run() { - - DistributedLockService service = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME); - - DistributedLockService.destroy(LOCK_SERVICE_NAME); + private void verifyDistributedLockServiceMXBean(final VM managerVM, final int memberCount) { + managerVM.invoke("verifyDistributedLockServiceMXBean", () -> { + ManagementService service = this.managementTestRule.getManagementService(); - ManagementService mgmtService = getManagementService(); - - LockServiceMXBean bean = null; - try { + if (memberCount == 0) { + await().until(() -> assertThat(service.getDistributedLockServiceMXBean(LOCK_SERVICE_NAME)).isNull()); + return; + } - bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME); + DistributedLockServiceMXBean distributedLockServiceMXBean = awaitDistributedLockServiceMXBean(LOCK_SERVICE_NAME, memberCount); + assertThat(distributedLockServiceMXBean).isNotNull(); + assertThat(distributedLockServiceMXBean.getName()).isEqualTo(LOCK_SERVICE_NAME); + }); + } - } catch (ManagementException mgs) { + private DistributedSystemMXBean awaitDistributedSystemMXBean() { + ManagementService service = this.managementTestRule.getManagementService(); - } - assertNull(bean); + await().until(() -> assertThat(service.getDistributedSystemMXBean()).isNotNull()); - } - }; - vm.invoke(closeLockService); + return service.getDistributedSystemMXBean(); } /** - * Lock data related verifications - * - * @param vm + * Await and return a DistributedRegionMXBean proxy with specified member + * count. */ - @SuppressWarnings("serial") - protected void verifyLockData(final VM vm) { - SerializableRunnable verifyLockData = new SerializableRunnable("Verify LockService") { - public void run() { - - DistributedLockService service = DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME); - - final String LOCK_OBJECT = "lockObject_" + vm.getPid(); - - Wait.waitForCriterion(new WaitCriterion() { - DistributedLockService service = null; - - public String description() { - return "Waiting for the lock service to be initialised"; - } - - public boolean done() { - DistributedLockService service = - DistributedLockService.getServiceNamed(LOCK_SERVICE_NAME); - boolean done = service != null; - return done; - } - - }, MAX_WAIT, 500, true); - - service.lock(LOCK_OBJECT, 1000, -1); - + private DistributedLockServiceMXBean awaitDistributedLockServiceMXBean(final String lockServiceName, final int memberCount) { + ManagementService service = this.managementTestRule.getManagementService(); - ManagementService mgmtService = getManagementService(); + await().until(() -> { + assertThat(service.getDistributedLockServiceMXBean(lockServiceName)).isNotNull(); + assertThat(service.getDistributedLockServiceMXBean(lockServiceName).getMemberCount()).isEqualTo(memberCount); + }); - LockServiceMXBean bean = null; - try { - - bean = mgmtService.getLocalLockServiceMBean(LOCK_SERVICE_NAME); - - } catch (ManagementException mgs) { - - } - assertNotNull(bean); - String[] listHeldLock = bean.listHeldLocks(); - assertEquals(listHeldLock.length, 1); - LogWriterUtils.getLogWriter().info("List Of Lock Object is " + listHeldLock[0]); - Map<String, String> lockThreadMap = bean.listThreadsHoldingLock(); - assertEquals(lockThreadMap.size(), 1); - LogWriterUtils.getLogWriter().info("List Of Lock Thread is " + lockThreadMap.toString()); - } - }; - vm.invoke(verifyLockData); + return service.getDistributedLockServiceMXBean(lockServiceName); } /** - * Verify lock data from remote Managing node - * - * @param vm + * Await and return a LockServiceMXBean proxy for a specific member and + * lockServiceName. */ - @SuppressWarnings("serial") - protected void verifyLockDataRemote(final VM vm) { - SerializableRunnable verifyLockDataRemote = - new SerializableRunnable("Verify LockService Remote") { - public void run() { - - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - Set<DistributedMember> otherMemberSet = - cache.getDistributionManager().getOtherNormalDistributionManagerIds(); - - for (DistributedMember member : otherMemberSet) { - LockServiceMXBean bean = null; - try { - bean = MBeanUtil.getLockServiceMbeanProxy(member, LOCK_SERVICE_NAME); - } catch (Exception e) { - InternalDistributedSystem.getLoggerI18n() - .fine("Undesired Result , LockServiceMBean Should not be null", e); - - } - assertNotNull(bean); - String[] listHeldLock = bean.listHeldLocks(); - assertEquals(listHeldLock.length, 1); - LogWriterUtils.getLogWriter().info("List Of Lock Object is " + listHeldLock[0]); - Map<String, String> lockThreadMap = bean.listThreadsHoldingLock(); - assertEquals(lockThreadMap.size(), 1); - LogWriterUtils.getLogWriter() - .info("List Of Lock Thread is " + lockThreadMap.toString()); - } - - } - }; - vm.invoke(verifyLockDataRemote); - } + private LockServiceMXBean awaitLockServiceMXBeanProxy(final DistributedMember member, final String lockServiceName) { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + ObjectName lockServiceMXBeanName = service.getLockServiceMBeanName(member, lockServiceName); - protected void checkNavigation(final VM vm, final DistributedMember lockServiceMember) { - SerializableRunnable checkNavigation = new SerializableRunnable("Check Navigation") { - public void run() { - - final ManagementService service = getManagementService(); - - DistributedSystemMXBean disMBean = service.getDistributedSystemMXBean(); - try { - ObjectName expected = MBeanJMXAdapter.getDistributedLockServiceName(LOCK_SERVICE_NAME); - ObjectName actual = disMBean.fetchDistributedLockServiceObjectName(LOCK_SERVICE_NAME); - assertEquals(expected, actual); - } catch (Exception e) { - throw new AssertionError("Lock Service Navigation Failed ", e); - } - - try { - ObjectName expected = - MBeanJMXAdapter.getLockServiceMBeanName(lockServiceMember.getId(), LOCK_SERVICE_NAME); - ObjectName actual = - disMBean.fetchLockServiceObjectName(lockServiceMember.getId(), LOCK_SERVICE_NAME); - assertEquals(expected, actual); - } catch (Exception e) { - throw new AssertionError("Lock Service Navigation Failed ", e); - } + await().until(() -> assertThat(service.getMBeanProxy(lockServiceMXBeanName, LockServiceMXBean.class)).isNotNull()); - } - }; - vm.invoke(checkNavigation); + return service.getMBeanProxy(lockServiceMXBeanName, LockServiceMXBean.class); } /** - * Verify Aggregate MBean - * - * @param vm + * Await creation of local LockServiceMXBean for specified lockServiceName. */ - @SuppressWarnings("serial") - protected void checkAggregate(final VM vm, final int expectedMembers) { - SerializableRunnable checkAggregate = new SerializableRunnable("Verify Aggregate MBean") { - public void run() { - - final ManagementService service = getManagementService(); - if (expectedMembers == 0) { - try { - Wait.waitForCriterion(new WaitCriterion() { - - DistributedLockServiceMXBean bean = null; + private LockServiceMXBean awaitLockServiceMXBean(final String lockServiceName) { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); - public String description() { - return "Waiting for the proxy to get deleted at managing node"; - } + await().until(() -> assertThat(service.getLocalLockServiceMBean(lockServiceName)).isNotNull()); - public boolean done() { - bean = service.getDistributedLockServiceMXBean(LOCK_SERVICE_NAME); - - boolean done = (bean == null); - return done; - } - - }, MAX_WAIT, 500, true); - - } catch (Exception e) { - throw new AssertionError("could not remove Aggregate Bean in required time", e); - - } - return; - } + return service.getLocalLockServiceMBean(lockServiceName); + } - DistributedLockServiceMXBean bean = null; - try { - bean = MBeanUtil.getDistributedLockMbean(LOCK_SERVICE_NAME, expectedMembers); - } catch (Exception e) { - InternalDistributedSystem.getLoggerI18n() - .fine("Undesired Result , LockServiceMBean Should not be null", e); + /** + * Await destruction of local LockServiceMXBean for specified + * lockServiceName. + */ + private void awaitLockServiceMXBeanIsNull(final String lockServiceName) { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); - } - assertNotNull(bean); - assertEquals(bean.getName(), LOCK_SERVICE_NAME); + await().until(() -> assertThat(service.getLocalLockServiceMBean(lockServiceName)).isNull()); + } - } - }; - vm.invoke(checkAggregate); + private ConditionFactory await() { + return Awaitility.await().atMost(MAX_WAIT_MILLIS, MILLISECONDS); } } +