http://git-wip-us.apache.org/repos/asf/geode/blob/662b500e/geode-core/src/test/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java index d2e797e..5b71b1e 100644 --- a/geode-core/src/test/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/bean/stats/DistributedSystemStatsDUnitTest.java @@ -14,98 +14,70 @@ */ package org.apache.geode.management.bean.stats; -import org.junit.experimental.categories.Category; -import org.junit.Test; - +import static com.jayway.awaitility.Awaitility.*; import static org.junit.Assert.*; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; -import org.apache.geode.test.junit.categories.DistributedTest; - +import java.lang.management.ManagementFactory; import java.util.Set; +import java.util.concurrent.TimeUnit; import javax.management.ObjectName; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.internal.cache.DiskStoreStats; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.management.DistributedSystemMXBean; -import org.apache.geode.management.ManagementTestBase; +import org.apache.geode.management.ManagementService; +import org.apache.geode.management.ManagementTestRule; +import org.apache.geode.management.Manager; +import org.apache.geode.management.Member; import org.apache.geode.management.MemberMXBean; import org.apache.geode.management.internal.SystemManagementService; -import org.apache.geode.management.internal.beans.MemberMBean; -import org.apache.geode.management.internal.beans.MemberMBeanBridge; -import org.apache.geode.test.dunit.Assert; -import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.junit.categories.DistributedTest; -/** - */ @Category(DistributedTest.class) -public class DistributedSystemStatsDUnitTest extends ManagementTestBase { +@SuppressWarnings({"unused", "serial"}) +public class DistributedSystemStatsDUnitTest { - private static final long serialVersionUID = 1L; + @Manager + private VM manager; - public DistributedSystemStatsDUnitTest() { - super(); - } + @Member + private VM[] members; + + @Rule + public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(true).build(); @Test public void testDistributedSystemStats() throws Exception { - initManagement(true); - - for (VM vm : managedNodeList) { - setDiskStats(vm); - } - verifyDiskStats(managingNode); - } - - @SuppressWarnings("serial") - public void setDiskStats(VM vm1) throws Exception { - vm1.invoke(new SerializableRunnable("Set Member Stats") { - public void run() { - MemberMBean bean = (MemberMBean) managementService.getMemberMXBean(); - MemberMBeanBridge bridge = bean.getBridge(); - DiskStoreStats diskStoreStats = new DiskStoreStats(basicGetSystem(), "test"); - bridge.addDiskStoreStats(diskStoreStats); - diskStoreStats.startRead(); - diskStoreStats.startWrite(); - diskStoreStats.startBackup(); - diskStoreStats.startRecovery(); - diskStoreStats.incWrittenBytes(20, true); - diskStoreStats.startFlush(); - diskStoreStats.setQueueSize(10); - } - }); - } - - @SuppressWarnings("serial") - public void verifyDiskStats(VM vm1) throws Exception { - vm1.invoke(new SerializableRunnable("Set Member Stats") { - public void run() { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - - SystemManagementService service = (SystemManagementService) getManagementService(); - DistributedSystemMXBean bean = service.getDistributedSystemMXBean(); - assertNotNull(bean); - Set<DistributedMember> otherMemberSet = - cache.getDistributionManager().getOtherNormalDistributionManagerIds(); - - for (DistributedMember member : otherMemberSet) { - ObjectName memberMBeanName; - try { - memberMBeanName = service.getMemberMBeanName(member); - waitForProxy(memberMBeanName, MemberMXBean.class); - MemberMXBean memberBean = service.getMBeanProxy(memberMBeanName, MemberMXBean.class); - waitForRefresh(2, memberMBeanName); - } catch (NullPointerException e) { - Assert.fail("FAILED WITH EXCEPION", e); - } catch (Exception e) { - Assert.fail("FAILED WITH EXCEPION", e); - } - } - + this.manager.invoke("verifyMBeans", () -> { + GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + assertNotNull(cache); + + SystemManagementService service = + (SystemManagementService) ManagementService.getManagementService(cache); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + assertNotNull(distributedSystemMXBean); + + Set<DistributedMember> otherMemberSet = + cache.getDistributionManager().getOtherNormalDistributionManagerIds(); + assertEquals(3, otherMemberSet.size()); + + for (DistributedMember member : otherMemberSet) { + ObjectName memberMXBeanName = service.getMemberMBeanName(member); + await().atMost(2, TimeUnit.MINUTES).until(() -> assertTrue( + ManagementFactory.getPlatformMBeanServer().isRegistered(memberMXBeanName))); + + MemberMXBean memberMXBean = service.getMBeanProxy(memberMXBeanName, MemberMXBean.class); + assertNotNull(memberMXBean); + + final long lastRefreshTime = service.getLastUpdateTime(memberMXBeanName); + await().atMost(1, TimeUnit.MINUTES) + .until(() -> assertTrue(service.getLastUpdateTime(memberMXBeanName) > lastRefreshTime)); } }); }
http://git-wip-us.apache.org/repos/asf/geode/blob/662b500e/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java index 9416616..a7940a4 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestClientIdsDUnitTest.java @@ -14,11 +14,22 @@ */ package org.apache.geode.management.internal.pulse; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.geode.distributed.ConfigurationProperties.*; +import static org.apache.geode.test.dunit.NetworkUtils.getServerHostName; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.*; +import java.io.IOException; +import java.io.Serializable; import java.util.Properties; +import javax.management.ObjectName; + +import com.jayway.awaitility.Awaitility; +import com.jayway.awaitility.core.ConditionFactory; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -29,273 +40,141 @@ import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.Scope; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.client.ClientRegionFactory; +import org.apache.geode.cache.client.ClientRegionShortcut; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.server.CacheServer; import org.apache.geode.distributed.DistributedMember; -import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.management.CacheServerMXBean; -import org.apache.geode.management.MBeanUtil; -import org.apache.geode.management.ManagementTestBase; -import org.apache.geode.management.internal.cli.CliUtil; +import org.apache.geode.management.ManagementTestRule; +import org.apache.geode.management.Manager; +import org.apache.geode.management.internal.SystemManagementService; import org.apache.geode.test.dunit.Assert; import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.NetworkUtils; import org.apache.geode.test.dunit.SerializableCallable; import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; -import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; /** * This is for testing client IDs */ @Category(DistributedTest.class) -public class TestClientIdsDUnitTest extends JUnit4DistributedTestCase { - - private static final String k1 = "k1"; - private static final String k2 = "k2"; - - private static final String client_k1 = "client-k1"; - - private static final String client_k2 = "client-k2"; - - /** name of the test region */ - private static final String REGION_NAME = "ClientHealthStatsDUnitTest_Region"; - - private static VM server = null; - - private static VM client = null; - - private static VM client2 = null; - - private static VM managingNode = null; - - private ManagementTestBase helper; - - @Override - public final void preSetUp() throws Exception { - this.helper = new ManagementTestBase() {}; - } - - @Override - public final void postSetUp() throws Exception { - final Host host = Host.getHost(0); - managingNode = host.getVM(0); - server = host.getVM(1); - client = host.getVM(2); - client2 = host.getVM(3); - } - - @Override - public final void preTearDown() throws Exception { - helper.closeCache(managingNode); - helper.closeCache(server); - helper.closeCache(client); - helper.closeCache(client2); - - disconnectFromDS(); +@SuppressWarnings({"serial", "unused"}) +public class TestClientIdsDUnitTest implements Serializable { + + private static final String KEY1 = "KEY1"; + private static final String KEY2 = "KEY2"; + private static final String VALUE1 = "client-KEY1"; + private static final String VALUE2 = "client-KEY2"; + private static final String REGION_NAME = + TestClientIdsDUnitTest.class.getSimpleName() + "_Region"; + + @Manager + private VM managerVM; + + private VM serverVM; + private VM client1VM; + private VM client2VM; + + @Rule + public ManagementTestRule managementTestRule = ManagementTestRule.builder().start(false).build(); + + @Before + public void before() throws Exception { + serverVM = Host.getHost(0).getVM(1); + client1VM = Host.getHost(0).getVM(2); + client2VM = Host.getHost(0).getVM(3); } @Test public void testClientIds() throws Exception { - helper.createManagementCache(managingNode); - helper.startManagingNode(managingNode); - int port = (Integer) createServerCache(server); - DistributedMember serverMember = helper.getMember(server); - createClientCache(client, NetworkUtils.getServerHostName(server.getHost()), port); - createClientCache(client2, NetworkUtils.getServerHostName(server.getHost()), port); - put(client); - put(client2); - verifyClientIds(managingNode, serverMember, port); - helper.stopManagingNode(managingNode); - } + this.managementTestRule.createManagers(); - @SuppressWarnings("serial") - private Object createServerCache(VM vm) { - return vm.invoke(new SerializableCallable("Create Server Cache") { - public Object call() { - try { - return createServerCache(); - } catch (Exception e) { - fail("Error while createServerCache " + e); - } - return null; - } - }); - } + int port = this.serverVM.invoke(() -> createServerCache()); + + this.client1VM + .invoke(() -> createClientCache(getServerHostName(this.serverVM.getHost()), port)); + this.client2VM + .invoke(() -> createClientCache(getServerHostName(this.serverVM.getHost()), port)); - @SuppressWarnings("serial") - private void createClientCache(VM vm, final String host, final Integer port1) { - vm.invoke(new SerializableCallable("Create Client Cache") { + DistributedMember serverMember = this.managementTestRule.getDistributedMember(this.serverVM); + DistributedMember client1Member = this.managementTestRule.getDistributedMember(this.client1VM); + DistributedMember client2Member = this.managementTestRule.getDistributedMember(this.client2VM); - public Object call() { + // this.managerVM.invoke(() -> verifyClientIds(serverMember, port)); + this.managerVM.invoke(() -> { + CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember, port); + await().until(() -> { try { - createClientCache(host, port1); + assertThat(cacheServerMXBean.getClientIds()).hasSize(2); } catch (Exception e) { - fail("Error while createClientCache " + e); + throw new Error(e); } - return null; - } + }); + assertThat(cacheServerMXBean.getClientIds()).hasSize(2); // TODO }); } - private Cache createCache(Properties props) throws Exception { - DistributedSystem ds = getSystem(props); - ds.disconnect(); - ds = getSystem(props); - assertNotNull(ds); - Cache cache = (GemFireCacheImpl) CacheFactory.create(ds); - assertNotNull(cache); - return cache; - } + private int createServerCache() throws IOException { + Cache cache = this.managementTestRule.getCache(); - private Integer createServerCache(DataPolicy dataPolicy) throws Exception { - Cache cache = helper.createCache(false); AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setDataPolicy(dataPolicy); + factory.setDataPolicy(DataPolicy.REPLICATE); + RegionAttributes attrs = factory.create(); cache.createRegion(REGION_NAME, attrs); - int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - CacheServer server1 = cache.addCacheServer(); - server1.setPort(port); - server1.setNotifyBySubscription(true); - server1.start(); - return new Integer(server1.getPort()); - } - public Integer createServerCache() throws Exception { - return createServerCache(DataPolicy.REPLICATE); + CacheServer cacheServer = cache.addCacheServer(); + cacheServer.setPort(0); + cacheServer.setNotifyBySubscription(true); + cacheServer.start(); + return cacheServer.getPort(); } - public Cache createClientCache(String host, Integer port1) throws Exception { + private void createClientCache(final String host, final int serverPort) { + ClientCache cache = this.managementTestRule.getClientCache(); - Properties props = new Properties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, ""); - Cache cache = createCache(props); - PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(host, port1.intValue()) + PoolImpl pool = (PoolImpl) PoolManager.createFactory().addServer(host, serverPort) .setSubscriptionEnabled(false).setThreadLocalConnections(true).setMinConnections(1) .setReadTimeout(20000).setPingInterval(10000).setRetryAttempts(1) - .setSubscriptionEnabled(true).setStatisticInterval(1000) - .create("CacheServerManagementDUnitTest"); - - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setPoolName(p.getName()); - - RegionAttributes attrs = factory.create(); - Region region = cache.createRegion(REGION_NAME, attrs); - return cache; + .setSubscriptionEnabled(true).setStatisticInterval(1000).create(getClass().getSimpleName()); + ClientRegionFactory factory = + cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY); + factory.setPoolName(pool.getName()); + factory.create(REGION_NAME); } - /** - * get member id - */ - @SuppressWarnings("serial") - protected static DistributedMember getMember() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - return cache.getDistributedSystem().getDistributedMember(); + private void verifyClientIds(final DistributedMember serverMember, final int serverPort) + throws Exception { + CacheServerMXBean cacheServerMXBean = awaitCacheServerMXBean(serverMember, serverPort); + await().until(() -> { + try { + assertThat(cacheServerMXBean.getClientIds()).hasSize(2); + } catch (Exception e) { + throw new Error(e); + } + }); + assertThat(cacheServerMXBean.getClientIds()).hasSize(2); // TODO } - /** - * Verify the Cache Server details - * - * @param vm - */ - @SuppressWarnings("serial") - protected void verifyClientIds(final VM vm, final DistributedMember serverMember, - final int serverPort) { - SerializableRunnable verifyCacheServerRemote = - new SerializableRunnable("Verify Cache Server Remote") { - public void run() { - try { - final WaitCriterion waitCriteria = new WaitCriterion() { - @Override - public boolean done() { - CacheServerMXBean bean = null; - try { - bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort); - if (bean != null) { - if (bean.getClientIds().length > 0) { - return true; - } - } - } catch (Exception e) { - LogWriterUtils.getLogWriter().info("exception occured " + e.getMessage() - + CliUtil.stackTraceAsString((Throwable) e)); - } - return false; - } + private CacheServerMXBean awaitCacheServerMXBean(final DistributedMember serverMember, + final int port) { + SystemManagementService service = this.managementTestRule.getSystemManagementService(); + ObjectName objectName = service.getCacheServerMBeanName(port, serverMember); - @Override - public String description() { - return "wait for getNumOfClients bean to complete and get results"; - } - }; - Wait.waitForCriterion(waitCriteria, 2 * 60 * 1000, 3000, true); + await().until( + () -> assertThat(service.getMBeanProxy(objectName, CacheServerMXBean.class)).isNotNull()); - // Now it is sure that bean would be available - CacheServerMXBean bean = MBeanUtil.getCacheServerMbeanProxy(serverMember, serverPort); - LogWriterUtils.getLogWriter().info("verifyClientIds = " + bean.getClientIds().length); - assertEquals(true, bean.getClientIds().length > 0 ? true : false); - } catch (Exception e) { - fail("Error while verifying cache server from remote member " + e); - } - } - }; - vm.invoke(verifyCacheServerRemote); + return service.getMBeanProxy(objectName, CacheServerMXBean.class); } - /** - * Verify the Cache Server details - * - * @param vm - */ - @SuppressWarnings("serial") - protected void put(final VM vm) { - SerializableRunnable put = new SerializableRunnable("put") { - public void run() { - try { - Cache cache = GemFireCacheImpl.getInstance(); - Region<Object, Object> r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); - assertNotNull(r1); - - r1.put(k1, client_k1); - assertEquals(r1.getEntry(k1).getValue(), client_k1); - r1.put(k2, client_k2); - assertEquals(r1.getEntry(k2).getValue(), client_k2); - try { - Thread.sleep(10000); - } catch (Exception e) { - // sleep - } - r1.clear(); - - r1.put(k1, client_k1); - assertEquals(r1.getEntry(k1).getValue(), client_k1); - r1.put(k2, client_k2); - assertEquals(r1.getEntry(k2).getValue(), client_k2); - try { - Thread.sleep(10000); - } catch (Exception e) { - // sleep - } - r1.clear(); - } catch (Exception ex) { - Assert.fail("failed while put", ex); - } - } - - }; - vm.invoke(put); + private ConditionFactory await() { + return Awaitility.await().atMost(2, MINUTES); } - } http://git-wip-us.apache.org/repos/asf/geode/blob/662b500e/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestSubscriptionsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestSubscriptionsDUnitTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestSubscriptionsDUnitTest.java index 7d96517..52c7b9c 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestSubscriptionsDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/pulse/TestSubscriptionsDUnitTest.java @@ -15,6 +15,9 @@ package org.apache.geode.management.internal.pulse; import static org.apache.geode.distributed.ConfigurationProperties.*; +import static org.apache.geode.test.dunit.Host.*; +import static org.apache.geode.test.dunit.NetworkUtils.*; +import static org.apache.geode.test.dunit.Wait.*; import static org.junit.Assert.*; import java.util.Properties; @@ -32,268 +35,173 @@ import org.apache.geode.cache.Scope; import org.apache.geode.cache.client.PoolManager; import org.apache.geode.cache.client.internal.PoolImpl; import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; -import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.management.DistributedSystemMXBean; import org.apache.geode.management.ManagementService; import org.apache.geode.management.ManagementTestBase; -import org.apache.geode.test.dunit.Assert; -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.LogWriterUtils; -import org.apache.geode.test.dunit.NetworkUtils; -import org.apache.geode.test.dunit.SerializableCallable; -import org.apache.geode.test.dunit.SerializableRunnable; import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; /** * This is for testing subscriptions */ @Category(DistributedTest.class) -public class TestSubscriptionsDUnitTest extends JUnit4DistributedTestCase { +@SuppressWarnings("serial") +public class TestSubscriptionsDUnitTest extends ManagementTestBase { - private static final String k1 = "k1"; - private static final String k2 = "k2"; - private static final String client_k1 = "client-k1"; - - private static final String client_k2 = "client-k2"; private static final String REGION_NAME = TestSubscriptionsDUnitTest.class.getSimpleName() + "_Region"; + + private static final String KEY1 = "k1"; + private static final String KEY2 = "k2"; + private static final String CLIENT_VALUE1 = "client-k1"; + private static final String CLIENT_VALUE2 = "client-k2"; + private static VM server = null; private static VM client = null; private static VM client2 = null; - private static VM managingNode = null; - private ManagementTestBase helper; - - @Override - public final void preSetUp() throws Exception { - this.helper = new ManagementTestBase() {}; - } @Override - public final void postSetUp() throws Exception { - final Host host = Host.getHost(0); - managingNode = host.getVM(0); - server = host.getVM(1); - client = host.getVM(2); - client2 = host.getVM(3); - } - - @Override - public final void preTearDown() throws Exception { - helper.closeCache(managingNode); - helper.closeCache(server); - helper.closeCache(client); - helper.closeCache(client2); - disconnectFromDS(); + public final void postSetUpManagementTestBase() throws Exception { + server = getHost(0).getVM(1); + client = getHost(0).getVM(2); + client2 = getHost(0).getVM(3); } @Test - public void testNoOfSubscription() throws Exception { + public void testNumSubscriptions() throws Exception { + createManagementCache(managingNode); + startManagingNode(managingNode); + + int port = createServerCache(server); + getMember(server); - helper.createManagementCache(managingNode); - helper.startManagingNode(managingNode); + createClientCache(client, getServerHostName(server.getHost()), port); + createClientCache(client2, getServerHostName(server.getHost()), port); - int port = (Integer) createServerCache(server); - DistributedMember serverMember = helper.getMember(server); - createClientCache(client, NetworkUtils.getServerHostName(server.getHost()), port); - createClientCache(client2, NetworkUtils.getServerHostName(server.getHost()), port); put(client); put(client2); + registerInterest(client); registerInterest(client2); - verifyClientStats(managingNode, serverMember, port); - helper.stopManagingNode(managingNode); - } - @SuppressWarnings("serial") - private Object createServerCache(VM vm) { - return vm.invoke(new SerializableCallable("Create Server Cache in TestSubscriptionsDUnitTest") { + verifyNumSubscriptions(managingNode); - public Object call() { - try { - return createServerCache(); - } catch (Exception e) { - fail("Error while createServerCache in TestSubscriptionsDUnitTest" + e); - } - return null; - } - }); + stopManagingNode(managingNode); } - @SuppressWarnings("serial") - private void createClientCache(VM vm, final String host, final Integer port1) { - vm.invoke(new SerializableCallable("Create Client Cache in TestSubscriptionsDUnitTest") { + private int createServerCache(VM vm) { + return vm.invoke("Create Server Cache in TestSubscriptionsDUnitTest", () -> { + return createServerCache(); + }); + } - public Object call() { - try { - createClientCache(host, port1); - } catch (Exception e) { - fail("Error while createClientCache in TestSubscriptionsDUnitTest " + e); - } - return null; - } + private void createClientCache(VM vm, final String host, final int port1) { + vm.invoke("Create Client Cache in TestSubscriptionsDUnitTest", () -> { + createClientCache(host, port1); }); } private Cache createCache(Properties props) throws Exception { DistributedSystem ds = getSystem(props); - ds.disconnect(); - ds = getSystem(props); - assertNotNull(ds); - Cache cache = (GemFireCacheImpl) CacheFactory.create(ds); - assertNotNull(cache); + Cache cache = CacheFactory.create(ds); return cache; } - private Integer createServerCache(DataPolicy dataPolicy) throws Exception { - Cache cache = helper.createCache(false); + private int createServerCache(DataPolicy dataPolicy) throws Exception { + Cache cache = createCache(false); + AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_ACK); factory.setDataPolicy(dataPolicy); - RegionAttributes attrs = factory.create(); - cache.createRegion(REGION_NAME, attrs); - int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + + cache.createRegion(REGION_NAME, factory.create()); + CacheServer server1 = cache.addCacheServer(); - server1.setPort(port); + server1.setPort(0); server1.setNotifyBySubscription(true); server1.start(); - return new Integer(server1.getPort()); + + return server1.getPort(); } - public Integer createServerCache() throws Exception { + private int createServerCache() throws Exception { return createServerCache(DataPolicy.REPLICATE); } - public Cache createClientCache(String host, Integer port1) throws Exception { - + private Cache createClientCache(String host, int port1) throws Exception { Properties props = new Properties(); props.setProperty(MCAST_PORT, "0"); props.setProperty(LOCATORS, ""); + Cache cache = createCache(props); - PoolImpl p = (PoolImpl) PoolManager.createFactory().addServer(host, port1.intValue()) - .setSubscriptionEnabled(true).setThreadLocalConnections(true).setMinConnections(1) - .setReadTimeout(20000).setPingInterval(10000).setRetryAttempts(1) - .setSubscriptionEnabled(true).setStatisticInterval(1000) - .create("TestSubscriptionsDUnitTest"); + + PoolImpl p = + (PoolImpl) PoolManager.createFactory().addServer(host, port1).setSubscriptionEnabled(true) + .setThreadLocalConnections(true).setMinConnections(1).setReadTimeout(20000) + .setPingInterval(10000).setRetryAttempts(1).setSubscriptionEnabled(true) + .setStatisticInterval(1000).create("TestSubscriptionsDUnitTest"); AttributesFactory factory = new AttributesFactory(); factory.setScope(Scope.DISTRIBUTED_ACK); factory.setPoolName(p.getName()); RegionAttributes attrs = factory.create(); - Region region = cache.createRegion(REGION_NAME, attrs); - return cache; + cache.createRegion(REGION_NAME, attrs); + return cache; } - /** - * get member id - */ - @SuppressWarnings("serial") - protected static DistributedMember getMember() throws Exception { - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - return cache.getDistributedSystem().getDistributedMember(); - } + private void verifyNumSubscriptions(final VM vm) { + vm.invoke("TestSubscriptionsDUnitTest Verify Cache Server Remote", () -> { + final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + + waitForCriterion(new WaitCriterion() { + @Override + public boolean done() { + ManagementService service = ManagementService.getExistingManagementService(cache); + DistributedSystemMXBean distributedSystemMXBean = service.getDistributedSystemMXBean(); + return distributedSystemMXBean != null + & distributedSystemMXBean.getNumSubscriptions() > 1; + } - /** - * Verify the Cache Server details - * - * @param vm - */ - @SuppressWarnings("serial") - protected void verifyClientStats(final VM vm, final DistributedMember serverMember, - final int serverPort) { - SerializableRunnable verifyCacheServerRemote = - new SerializableRunnable("TestSubscriptionsDUnitTest Verify Cache Server Remote") { - public void run() { - final GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - try { - final WaitCriterion waitCriteria = new WaitCriterion() { - @Override - public boolean done() { - ManagementService service = ManagementService.getExistingManagementService(cache); - final DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); - if (dsBean != null) { - if (dsBean.getNumSubscriptions() > 1) { - return true; - } - } - return false; - } - - @Override - public String description() { - return "TestSubscriptionsDUnitTest wait for getDistributedSystemMXBean to complete and get results"; - } - }; - Wait.waitForCriterion(waitCriteria, 2 * 60 * 1000, 3000, true); - final DistributedSystemMXBean dsBean = ManagementService - .getExistingManagementService(cache).getDistributedSystemMXBean(); - assertNotNull(dsBean); - LogWriterUtils.getLogWriter() - .info("TestSubscriptionsDUnitTest dsBean.getNumSubscriptions() =" - + dsBean.getNumSubscriptions()); - assertTrue(dsBean.getNumSubscriptions() == 2 ? true : false); - } catch (Exception e) { - fail("TestSubscriptionsDUnitTest Error while verifying subscription " - + e.getMessage()); - } - - } - }; - vm.invoke(verifyCacheServerRemote); - } + @Override + public String description() { + return "TestSubscriptionsDUnitTest wait for getDistributedSystemMXBean to complete and get results"; + } + }, 2 * 60 * 1000, 3000, true); - /** - * Verify the Cache Server details - * - * @param vm - */ - @SuppressWarnings("serial") - protected void registerInterest(final VM vm) { - SerializableRunnable put = - new SerializableRunnable("TestSubscriptionsDUnitTest registerInterest") { - public void run() { - try { - Cache cache = GemFireCacheImpl.getInstance(); - Region<Object, Object> r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); - assertNotNull(r1); - r1.registerInterest(k1); - r1.registerInterest(k2); - } catch (Exception ex) { - Assert.fail("TestSubscriptionsDUnitTest failed while register Interest", ex); - } - } - - }; - vm.invoke(put); + DistributedSystemMXBean distributedSystemMXBean = + ManagementService.getExistingManagementService(cache).getDistributedSystemMXBean(); + assertNotNull(distributedSystemMXBean); + assertEquals(2, distributedSystemMXBean.getNumSubscriptions()); + }); } - @SuppressWarnings("serial") - protected void put(final VM vm) { - SerializableRunnable put = new SerializableRunnable("put") { - public void run() { - try { - Cache cache = GemFireCacheImpl.getInstance(); - Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME); - assertNotNull(r1); - r1.put(k1, client_k1); - assertEquals(r1.getEntry(k1).getValue(), client_k1); - r1.put(k2, client_k2); - assertEquals(r1.getEntry(k2).getValue(), client_k2); - } catch (Exception ex) { - Assert.fail("failed while put", ex); - } - } + private void registerInterest(final VM vm) { + vm.invoke("TestSubscriptionsDUnitTest registerInterest", () -> { + Cache cache = GemFireCacheImpl.getInstance(); + Region<Object, Object> region = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(region); - }; - vm.invoke(put); + region.registerInterest(KEY1); + region.registerInterest(KEY2); + }); } + private void put(final VM vm) { + vm.invoke("put", () -> { + Cache cache = GemFireCacheImpl.getInstance(); + Region region = cache.getRegion(Region.SEPARATOR + REGION_NAME); + assertNotNull(region); + + region.put(KEY1, CLIENT_VALUE1); + assertEquals(CLIENT_VALUE1, region.getEntry(KEY1).getValue()); + + region.put(KEY2, CLIENT_VALUE2); + assertEquals(CLIENT_VALUE2, region.getEntry(KEY2).getValue()); + }); + } } http://git-wip-us.apache.org/repos/asf/geode/blob/662b500e/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java index ef4681c..b3d4814 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/management/LuceneManagementDUnitTest.java @@ -144,13 +144,13 @@ public class LuceneManagementDUnitTest extends ManagementTestBase { } } - private static void verifyMBean() { + private void verifyMBean() { getMBean(); } - private static LuceneServiceMXBean getMBean() { - ObjectName objectName = - MBeanJMXAdapter.getCacheServiceMBeanName(ds.getDistributedMember(), "LuceneService"); + private LuceneServiceMXBean getMBean() { + ObjectName objectName = MBeanJMXAdapter + .getCacheServiceMBeanName(getSystem().getDistributedMember(), "LuceneService"); assertNotNull(getManagementService().getMBeanInstance(objectName, LuceneServiceMXBean.class)); return getManagementService().getMBeanInstance(objectName, LuceneServiceMXBean.class); } @@ -179,14 +179,14 @@ public class LuceneManagementDUnitTest extends ManagementTestBase { createPartitionRegion(vm, regionName); } - private static void createIndexes(String regionName, int numIndexes) { - LuceneService luceneService = LuceneServiceProvider.get(cache); + private void createIndexes(String regionName, int numIndexes) { + LuceneService luceneService = LuceneServiceProvider.get(getCache()); for (int i = 0; i < numIndexes; i++) { luceneService.createIndex(INDEX_NAME + "_" + i, regionName, "field" + i); } } - private static void verifyAllMBeanIndexMetrics(String regionName, int numRegionIndexes, + private void verifyAllMBeanIndexMetrics(String regionName, int numRegionIndexes, int numTotalIndexes) { LuceneServiceMXBean mbean = getMBean(); verifyMBeanIndexMetrics(mbean, regionName, numRegionIndexes, numTotalIndexes); @@ -211,18 +211,17 @@ public class LuceneManagementDUnitTest extends ManagementTestBase { } } - private static void putEntries(String regionName, int numEntries) { + private void putEntries(String regionName, int numEntries) { for (int i = 0; i < numEntries; i++) { - Region region = cache.getRegion(regionName); + Region region = getCache().getRegion(regionName); String key = String.valueOf(i); Object value = new TestObject(key); region.put(key, value); } } - private static void queryEntries(String regionName, String indexName) - throws LuceneQueryException { - LuceneService service = LuceneServiceProvider.get(cache); + private void queryEntries(String regionName, String indexName) throws LuceneQueryException { + LuceneService service = LuceneServiceProvider.get(getCache()); LuceneQuery query = service.createLuceneQueryFactory().create(indexName, regionName, "field0:0", null); query.findValues();