Repository: incubator-geode Updated Branches: refs/heads/feature/GEODE-835 29861e4fd -> 7e559224a
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1a0748b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java index 176fbea..9ef87d2 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java @@ -16,6 +16,17 @@ */ package com.gemstone.gemfire.cache.client.internal; +import java.io.IOException; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; + import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.client.PoolManager; import com.gemstone.gemfire.cache.client.internal.locator.ClientConnectionRequest; @@ -39,31 +50,24 @@ import com.gemstone.gemfire.internal.logging.LocalLogWriter; import com.gemstone.gemfire.test.dunit.Host; import com.gemstone.gemfire.test.dunit.LogWriterUtils; import com.gemstone.gemfire.test.dunit.NetworkUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; import com.gemstone.gemfire.test.dunit.VM; -import com.jayway.awaitility.Awaitility; -import org.junit.Assert; - -import java.io.IOException; -import java.io.Serializable; -import java.net.InetAddress; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; /** * */ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { - + /** * The number of connections that we can be off by in the balancing tests * We need this little fudge factor, because the locator can receive an update * from the bridge server after it has made incremented its counter for a client * connection, but the client hasn't connected yet. This wipes out the estimation * on the locator. This means that we may be slighly off in our balance. - * <p> + * * TODO grid fix this hole in the locator. */ private static final int ALLOWABLE_ERROR_IN_COUNT = 1; @@ -74,144 +78,149 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { } /** - * Test the locator discovers a bridge server and is initialized with + * Test the locator discovers a bridge server and is initialized with * the correct load for that bridge server. */ public void testDiscovery() { Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); - + VM vm2 = host.getVM(2); +// vm0.invoke(new SerializableRunnable() { +// public void run() { +// System.setProperty("gemfire.DistributionAdvisor.VERBOSE", "true"); +// } +// }); + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); - + startLocatorInVM(vm0, locatorPort, ""); + String locators = getLocatorString(host, locatorPort); - - int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); - + + int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators); + ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f); ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(vm0 .getHost()), serverPort); Map expected = new HashMap(); expected.put(expectedLocation, expectedLoad); - - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); - - int serverPort2 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); - + + checkLocatorLoad(vm0, expected); + + int serverPort2 = startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators); + ServerLocation expectedLocation2 = new ServerLocation(NetworkUtils.getServerHostName(vm0 .getHost()), serverPort2); - + expected.put(expectedLocation2, expectedLoad); - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + checkLocatorLoad(vm0, expected); } - + /** * Test that the locator will properly estimate the load for servers when - * it receives connection requests. + * it receives connection requests. */ - public void testEstimation() throws IOException, ClassNotFoundException { + public void testEstimation() throws UnknownHostException, IOException, ClassNotFoundException { Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); + startLocatorInVM(vm0, locatorPort, ""); String locators = getLocatorString(host, locatorPort); - - int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); - - ServerLoad expectedLoad = new ServerLoad(2 / 800f, 1 / 800.0f, 0f, 1f); + + int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators); + + ServerLoad expectedLoad = new ServerLoad(2/800f, 1 / 800.0f, 0f, 1f); ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort); Map expected = new HashMap(); expected.put(expectedLocation, expectedLoad); - + ClientConnectionResponse response; response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress - .getByName(NetworkUtils.getServerHostName(host)), locatorPort, + .getByName(NetworkUtils.getServerHostName(host)), locatorPort, new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000); Assert.assertEquals(expectedLocation, response.getServer()); - + response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress - .getByName(NetworkUtils.getServerHostName(host)), locatorPort, + .getByName(NetworkUtils.getServerHostName(host)), locatorPort, new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000, true); Assert.assertEquals(expectedLocation, response.getServer()); - + //we expect that the connection load load will be 2 * the loadPerConnection - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); - + checkLocatorLoad(vm0, expected); + QueueConnectionResponse response2; response2 = (QueueConnectionResponse) TcpClient.requestToServer(InetAddress - .getByName(NetworkUtils.getServerHostName(host)), locatorPort, + .getByName(NetworkUtils.getServerHostName(host)), locatorPort, new QueueConnectionRequest(null, 2, Collections.EMPTY_SET, null, false), 10000, true); Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers()); - + response2 = (QueueConnectionResponse) TcpClient .requestToServer(InetAddress.getByName(NetworkUtils.getServerHostName(host)), locatorPort, new QueueConnectionRequest(null, 5, Collections.EMPTY_SET, null, false), 10000, true); - + Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers()); //we expect that the queue load will increase by 2 expectedLoad.setSubscriptionConnectionLoad(2f); - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + checkLocatorLoad(vm0, expected); } - + /** * Test to make sure the bridge servers communicate * their updated load to the controller when the load * on the bridge server changes. - * - * @throws Exception + * @throws Exception */ public void testLoadMessaging() throws Exception { final Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); + startLocatorInVM(vm0, locatorPort, ""); String locators = getLocatorString(host, locatorPort); - - final int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); - + + final int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators); + //We expect 0 load Map expected = new HashMap(); ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort); ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f); expected.put(expectedLocation, expectedLoad); - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); - + checkLocatorLoad(vm0, expected); + PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addServer(NetworkUtils.getServerHostName(host), serverPort); pf.setMinConnections(8); pf.setMaxConnections(8); pf.setSubscriptionEnabled(true); - vm2.invoke("StartBridgeClient", () -> startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME })); - + startBridgeClientInVM(vm2, pf.getPoolAttributes(), new String[] {REGION_NAME}); + //We expect 8 client to server connections. The queue requires //an additional client to server connection, but that shouldn't show up here. - expectedLoad = new ServerLoad(8 / 800f, 1 / 800.0f, 1f, 1f); + expectedLoad = new ServerLoad(8/800f, 1 / 800.0f, 1f, 1f); expected.put(expectedLocation, expectedLoad); - - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); - + + + checkLocatorLoad(vm0, expected); + stopBridgeMemberVM(vm2); - + //Now we expect 0 load expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f); expected.put(expectedLocation, expectedLoad); - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + checkLocatorLoad(vm0, expected); } - + /** * Test to make sure that the locator * balancing load between two servers. - * - * @throws Exception + * @throws Exception */ public void testBalancing() throws Exception { final Host host = Host.getHost(0); @@ -219,60 +228,87 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); VM vm3 = host.getVM(3); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); + startLocatorInVM(vm0, locatorPort, ""); String locators = getLocatorString(host, locatorPort); - - vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); - vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); - + + startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators); + startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators); + PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort); pf.setMinConnections(80); pf.setMaxConnections(80); pf.setSubscriptionEnabled(false); pf.setIdleTimeout(-1); - vm3.invoke("StartBridgeClient", () -> startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME })); - - vm3.invoke("waitForPrefilledConnections", () -> waitForPrefilledConnections(80)); - - vm1.invoke("check connection count", () -> checkConnectionCount(40)); - vm2.invoke("check connection count", () -> checkConnectionCount(40)); + startBridgeClientInVM(vm3, pf.getPoolAttributes(), new String[] {REGION_NAME}); + + waitForPrefilledConnections(vm3, 80); + + checkConnectionCount(vm1, 40); + checkConnectionCount(vm2, 40); } - private void checkConnectionCount(final int count) { - Cache cache = (Cache) remoteObjects.get(CACHE_KEY); - final CacheServerImpl server = (CacheServerImpl) - cache.getCacheServers().get(0); - Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS) - .timeout(300, TimeUnit.SECONDS).until(() -> { - int sz = server.getAcceptor().getStats().getCurrentClientConnections(); - if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) { - return true; + private void checkConnectionCount(VM vm, final int count) { + SerializableRunnableIF checkConnectionCount = new SerializableRunnable("checkConnectionCount") { + public void run() { + Cache cache = (Cache) remoteObjects.get(CACHE_KEY); + final CacheServerImpl server = (CacheServerImpl) + cache.getCacheServers().get(0); + WaitCriterion wc = new WaitCriterion() { + String excuse; + public boolean done() { + int sz = server.getAcceptor().getStats() + .getCurrentClientConnections(); + if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) { + return true; + } + excuse = "Found " + sz + " connections, expected " + count; + return false; + } + public String description() { + return excuse; + } + }; + Wait.waitForCriterion(wc, 5 * 60 * 1000, 1000, true); } - System.out.println("Found " + sz + " connections, expected " + count); - return false; - }); + }; + + vm.invoke(checkConnectionCount); } - - private void waitForPrefilledConnections(final int count) throws Exception { - waitForPrefilledConnections(count, POOL_NAME); + + private void waitForPrefilledConnections(VM vm, final int count) throws Exception { + waitForPrefilledConnections(vm, count, POOL_NAME); } - private void waitForPrefilledConnections(final int count, final String poolName) throws Exception { - final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName); - Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS) - .timeout(300, TimeUnit.SECONDS).until(() -> pool.getConnectionCount() >= count); + private void waitForPrefilledConnections(VM vm, final int count, final String poolName) throws Exception { + SerializableRunnable runnable = new SerializableRunnable("waitForPrefilledConnections") { + public void run() { + final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName); + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return pool.getConnectionCount() >= count; + } + public String description() { + return "connection count never reached " + count; + } + }; + Wait.waitForCriterion(ev, MAX_WAIT, 200, true); + } + }; + if(vm == null) { + runnable.run(); + } else { + vm.invoke(runnable); + } } - - /** - * Test that the locator balances load between + + /** Test that the locator balances load between * three servers with intersecting server groups. * Server: 1 2 3 * Groups: a a,b b - * - * @throws Exception + * @throws Exception */ public void testIntersectingServerGroups() throws Exception { final Host host = Host.getHost(0); @@ -280,158 +316,175 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); VM vm3 = host.getVM(3); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); + startLocatorInVM(vm0, locatorPort, ""); String locators = getLocatorString(host, locatorPort); - - int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a" }, locators)); - vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); - vm3.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "b" }, locators)); - + + int serverPort1 = startBridgeServerInVM(vm1, new String[] {"a"}, locators); + startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators); + startBridgeServerInVM(vm3, new String[] {"b"}, locators); + PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort); pf.setMinConnections(12); pf.setSubscriptionEnabled(false); pf.setServerGroup("a"); pf.setIdleTimeout(-1); - startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME }); - waitForPrefilledConnections(12); - - vm1.invoke("Check Connection Count", () -> checkConnectionCount(6)); - vm2.invoke("Check Connection Count", () -> checkConnectionCount(6)); - vm3.invoke("Check Connection Count", () -> checkConnectionCount(0)); - + startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME}); + waitForPrefilledConnections(null, 12); + + checkConnectionCount(vm1, 6); + checkConnectionCount(vm2, 6); + checkConnectionCount(vm3, 0); + LogWriterUtils.getLogWriter().info("pool1 prefilled"); - + PoolFactoryImpl pf2 = (PoolFactoryImpl) PoolManager.createFactory(); pf2.init(pf.getPoolAttributes()); pf2.setServerGroup("b"); - PoolImpl pool2 = (PoolImpl) pf2.create("testPool2"); - waitForPrefilledConnections(12, "testPool2"); + PoolImpl pool2= (PoolImpl) pf2.create("testPool2"); + waitForPrefilledConnections(null, 12, "testPool2"); // The load will not be perfect, because we created all of the connections //for group A first. - vm1.invoke("Check Connection Count", () -> checkConnectionCount(6)); - vm2.invoke("Check Connection Count", () -> checkConnectionCount(9)); - vm3.invoke("Check Connection Count", () -> checkConnectionCount(9)); - + checkConnectionCount(vm1, 6); + checkConnectionCount(vm2, 9); + checkConnectionCount(vm3, 9); + LogWriterUtils.getLogWriter().info("pool2 prefilled"); - + ServerLocation location1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1); PoolImpl pool1 = (PoolImpl) PoolManager.getAll().get(POOL_NAME); Assert.assertEquals("a", pool1.getServerGroup()); - + //Use up all of the pooled connections on pool1, and acquire 3 more - for (int i = 0; i < 15; i++) { + for(int i = 0; i < 15; i++) { pool1.acquireConnection(); } - + LogWriterUtils.getLogWriter().info("aquired 15 connections in pool1"); - + //now the load should be equal - vm1.invoke("Check Connection Count", () -> checkConnectionCount(9)); - vm2.invoke("Check Connection Count", () -> checkConnectionCount(9)); - vm3.invoke("Check Connection Count", () -> checkConnectionCount(9)); - + checkConnectionCount(vm1, 9); + checkConnectionCount(vm2, 9); + checkConnectionCount(vm3, 9); + //use up all of the pooled connections on pool2 - for (int i = 0; i < 12; i++) { + for(int i = 0; i < 12; i++) { pool2.acquireConnection(); } - + LogWriterUtils.getLogWriter().info("aquired 12 connections in pool2"); - + //interleave creating connections in both pools - for (int i = 0; i < 6; i++) { + for(int i = 0; i < 6; i++) { pool1.acquireConnection(); pool2.acquireConnection(); } - + LogWriterUtils.getLogWriter().info("interleaved 6 connections from pool1 with 6 connections from pool2"); - + //The load should still be balanced - vm1.invoke("Check Connection Count", () -> checkConnectionCount(13)); - vm2.invoke("Check Connection Count", () -> checkConnectionCount(13)); - vm3.invoke("Check Connection Count", () -> checkConnectionCount(13)); - + checkConnectionCount(vm1, 13); + checkConnectionCount(vm2, 13); + checkConnectionCount(vm3, 13); + } - + public void testCustomLoadProbe() throws Exception { final Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); - // VM vm3 = host.getVM(3); - +// VM vm3 = host.getVM(3); + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); + startLocatorInVM(vm0, locatorPort, ""); String locators = getLocatorString(host, locatorPort); - - final ServerLoad load1 = new ServerLoad(.3f, .01f, .44f, 4564f); - final ServerLoad load2 = new ServerLoad(23.2f, 1.1f, 22.3f, .3f); - int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(null, locators, new String[] { REGION_NAME }, new MyLoadProbe(load1))); - int serverPort2 = vm2.invoke("Start BridgeServer", () -> startBridgeServer(null, locators, new String[] { REGION_NAME }, new MyLoadProbe(load2))); - + + ServerLoad load1= new ServerLoad(.3f, .01f, .44f, 4564f); + ServerLoad load2= new ServerLoad(23.2f, 1.1f, 22.3f, .3f); + int serverPort1 = startBridgeServerInVM(vm1, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load1 )); + int serverPort2 = startBridgeServerInVM(vm2, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load2 )); + HashMap expected = new HashMap(); ServerLocation l1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1); ServerLocation l2 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort2); expected.put(l1, load1); expected.put(l2, load2); - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); - + checkLocatorLoad(vm0, expected); + load1.setConnectionLoad(25f); - vm1.invoke("changeLoad", () -> changeLoad(load1)); + changeLoad(vm1, load1); load2.setSubscriptionConnectionLoad(3.5f); - vm2.invoke("changeLoad", () -> changeLoad(load2)); - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); - - final ServerLoad load1Updated = new ServerLoad(1f, .1f, 0f, 1f); - final ServerLoad load2Updated = new ServerLoad(2f, 5f, 0f, 2f); - expected.put(l1, load1Updated); - expected.put(l2, load2Updated); - vm1.invoke("changeLoad", () -> changeLoad(load1Updated)); - vm2.invoke("changeLoad", () -> changeLoad(load2Updated)); - vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); - + changeLoad(vm2, load2); + checkLocatorLoad(vm0, expected); + + load1 = new ServerLoad(1f, .1f, 0f, 1f); + load2 = new ServerLoad(2f, 5f, 0f, 2f); + expected.put(l1, load1); + expected.put(l2, load2); + changeLoad(vm1, load1); + changeLoad(vm2, load2); + checkLocatorLoad(vm0, expected); + PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort); pf.setMinConnections(20); pf.setSubscriptionEnabled(true); pf.setIdleTimeout(-1); - startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME }); - waitForPrefilledConnections(20); - + startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME}); + waitForPrefilledConnections(null, 20); + //The first 10 connection should to go vm1, then 1 to vm2, then another 9 to vm1 //because have unequal values for loadPerConnection - vm1.invoke("Check Connection Count", () -> checkConnectionCount(19)); - vm2.invoke("Check Connection Count", () -> checkConnectionCount(1)); + checkConnectionCount(vm1, 19); + checkConnectionCount(vm2, 1); } - - public void checkLocatorLoad(final Map expected) { - List locators = Locator.getLocators(); - Assert.assertEquals(1, locators.size()); - InternalLocator locator = (InternalLocator) locators.get(0); - final ServerLocator sl = locator.getServerLocatorAdvisee(); - InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out); - sl.getDistributionAdvisor().dumpProfiles("PROFILES= "); - Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS) - .timeout(300, TimeUnit.SECONDS).until(() -> expected.equals(sl.getLoadMap())); + + public void checkLocatorLoad(VM vm, final Map expected) { + vm.invoke(new SerializableRunnable() { + public void run() { + List locators = Locator.getLocators(); + Assert.assertEquals(1, locators.size()); + InternalLocator locator = (InternalLocator) locators.get(0); + final ServerLocator sl = locator.getServerLocatorAdvisee(); + InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out); + sl.getDistributionAdvisor().dumpProfiles("PROFILES= "); + WaitCriterion ev = new WaitCriterion() { + public boolean done() { + return expected.equals(sl.getLoadMap()); + } + public String description() { + return "load map never became equal to " + expected; + } + }; + Wait.waitForCriterion(ev, MAX_WAIT, 200, true); + } + }); } - - private void changeLoad(final ServerLoad newLoad) { - Cache cache = (Cache) remoteObjects.get(CACHE_KEY); - CacheServer server = cache.getCacheServers().get(0); - MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe(); - probe.setLoad(newLoad); + + private void changeLoad(VM vm, final ServerLoad newLoad) { + vm.invoke(new SerializableRunnable() { + + public void run() { + Cache cache = (Cache) remoteObjects.get(CACHE_KEY); + CacheServer server = (CacheServer) cache.getCacheServers().get(0); + MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe(); + probe.setLoad(newLoad); + } + + }); } - + private static class MyLoadProbe extends ServerLoadProbeAdapter implements Serializable { private ServerLoad load; - + public MyLoadProbe(ServerLoad load) { this.load = load; } - + public ServerLoad getLoad(ServerMetrics metrics) { float connectionLoad = load.getConnectionLoad() + metrics.getConnectionCount() * load.getLoadPerConnection(); @@ -440,7 +493,7 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { return new ServerLoad(connectionLoad, load.getLoadPerConnection(), queueLoad, load.getLoadPerSubscriptionConnection()); } - + public void setLoad(ServerLoad load) { this.load = load; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1a0748b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java index af5ba9c..2207e1d 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java @@ -16,35 +16,52 @@ */ package com.gemstone.gemfire.cache.client.internal; -import com.gemstone.gemfire.cache.*; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.Scope; import com.gemstone.gemfire.cache.client.Pool; import com.gemstone.gemfire.cache.client.PoolManager; -import com.gemstone.gemfire.cache.server.CacheServer; import com.gemstone.gemfire.cache.server.ServerLoadProbe; +import com.gemstone.gemfire.cache.server.CacheServer; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.gemfire.distributed.Locator; import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.internal.AvailablePortHelper; import com.gemstone.gemfire.internal.cache.PoolFactoryImpl; -import com.gemstone.gemfire.test.dunit.*; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.*; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.DistributedTestCase; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.Invoke; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.NetworkUtils; +import com.gemstone.gemfire.test.dunit.SerializableCallable; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; /** * */ -public abstract class LocatorTestBase extends DistributedTestCase { +public abstract class LocatorTestBase extends DistributedTestCase { protected static final String CACHE_KEY = "CACHE"; protected static final String LOCATOR_KEY = "LOCATOR"; protected static final String REGION_NAME = "A_REGION"; protected static final String POOL_NAME = "daPool"; protected static final Object CALLBACK_KEY = "callback"; - /** - * A map for storing temporary objects in a remote VM so that they can be used + /** A map for storing temporary objects in a remote VM so that they can be used * between calls. Cleared after each test. */ protected static final HashMap remoteObjects = new HashMap(); @@ -52,211 +69,264 @@ public abstract class LocatorTestBase extends DistributedTestCase { public LocatorTestBase(String name) { super(name); } - + @Override public final void preTearDown() throws Exception { - + SerializableRunnable tearDown = new SerializableRunnable("tearDown") { public void run() { Locator locator = (Locator) remoteObjects.get(LOCATOR_KEY); - if (locator != null) { + if(locator != null) { try { locator.stop(); - } catch (Exception e) { + } catch(Exception e) { //do nothing } } - + Cache cache = (Cache) remoteObjects.get(CACHE_KEY); - if (cache != null) { + if(cache != null) { try { cache.close(); - } catch (Exception e) { + } catch(Exception e) { //do nothing } } remoteObjects.clear(); - + } }; //We seem to like leaving the DS open if we can for //speed, but lets at least destroy our cache and locator. Invoke.invokeInEveryVM(tearDown); tearDown.run(); - + postTearDownLocatorTestBase(); } - + protected void postTearDownLocatorTestBase() throws Exception { } + + protected void startLocatorInVM(final VM vm, final int locatorPort, final String otherLocators) { + vm.invoke(new SerializableRunnable("Create Locator") { - protected void startLocator(final Host vmHost, final int locatorPort, final String otherLocators) { - disconnectFromDS(); - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators); - props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); - props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log"); - try { - InetAddress bindAddress = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost)); - Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddress, props); - remoteObjects.put(LOCATOR_KEY, locator); - } catch (UnknownHostException uhe) { - Assert.fail("While resolving bind address ", uhe); - } catch (IOException ex) { - Assert.fail("While starting locator on port " + locatorPort, ex); - } + final String testName= getUniqueName(); + public void run() { + disconnectFromDS(); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators); + props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); + props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + try { + File logFile = new File(testName + "-locator" + locatorPort + + ".log"); + InetAddress bindAddr = null; + try { + bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vm.getHost())); + } catch (UnknownHostException uhe) { + Assert.fail("While resolving bind address ", uhe); + } + Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props); + remoteObjects.put(LOCATOR_KEY, locator); + } catch (IOException ex) { + Assert.fail("While starting locator on port " + locatorPort, ex); + } + } + }); } - - protected void stopLocator() { - Locator locator = (Locator) remoteObjects.remove(LOCATOR_KEY); - locator.stop(); + + + + protected void stopLocatorInVM(VM vm) { + vm.invoke(new SerializableRunnable("Stop Locator") { + public void run() { + Locator locator = (Locator) remoteObjects.remove(LOCATOR_KEY); + locator.stop(); + } + }); } - - protected int startBridgeServer(String[] groups, String locators) throws IOException { - return startBridgeServer(groups, locators, new String[] { REGION_NAME }); + + protected int startBridgeServerInVM(VM vm, String[] groups, String locators) { + return startBridgeServerInVM(vm, groups, locators, new String[] {REGION_NAME}); } + + protected int addCacheServerInVM(VM vm, final String[] groups) { + SerializableCallable connect = + new SerializableCallable("Add Bridge server") { - protected int addCacheServer(final String[] groups) throws IOException { - Cache cache = (Cache) remoteObjects.get(CACHE_KEY); - CacheServer server = cache.addCacheServer(); - server.setPort(0); - server.setGroups(groups); - server.start(); - return new Integer(server.getPort()); + public Object call() throws Exception { + Cache cache = (Cache) remoteObjects.get(CACHE_KEY); + CacheServer server = cache.addCacheServer(); + final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); + server.setPort(serverPort); + server.setGroups(groups); + server.start(); + return new Integer(serverPort); + } + }; + Integer port = (Integer) vm.invoke(connect); + return port.intValue(); } - - protected int startBridgeServer(final String[] groups, final String locators, final String[] regions) throws IOException { - return startBridgeServer(groups, locators, regions, CacheServer.DEFAULT_LOAD_PROBE); + + protected int startBridgeServerInVM(VM vm, final String[] groups, final String locators, final String[] regions) { + return startBridgeServerInVM(vm, groups, locators, regions, CacheServer.DEFAULT_LOAD_PROBE); } - - protected int startBridgeServer(final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) throws IOException { - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.LOCATORS_NAME, locators); - DistributedSystem ds = getSystem(props); - Cache cache = CacheFactory.create(ds); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setEnableBridgeConflation(true); - factory.setDataPolicy(DataPolicy.REPLICATE); - RegionAttributes attributes = factory.create(); - for (int i = 0; i < regions.length; i++) { - cache.createRegion(regions[i], attributes); - } - CacheServer server = cache.addCacheServer(); - server.setPort(0); - server.setGroups(groups); - server.setLoadProbe(probe); - server.start(); - - remoteObjects.put(CACHE_KEY, cache); - - return new Integer(server.getPort()); + + protected int startBridgeServerInVM(VM vm, final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) { + SerializableCallable connect = + new SerializableCallable("Start bridge server") { + public Object call() throws IOException { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.LOCATORS_NAME, locators); + DistributedSystem ds = getSystem(props); + Cache cache = CacheFactory.create(ds); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setEnableBridgeConflation(true); + factory.setDataPolicy(DataPolicy.REPLICATE); + RegionAttributes attrs = factory.create(); + for(int i = 0; i < regions.length; i++) { + cache.createRegion(regions[i], attrs); + } + CacheServer server = cache.addCacheServer(); + final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); + server.setPort(serverPort); + server.setGroups(groups); + server.setLoadProbe(probe); + server.start(); + + remoteObjects.put(CACHE_KEY, cache); + + return new Integer(serverPort); + } + }; + Integer port = (Integer) vm.invoke(connect); + return port.intValue(); } - - protected int startBridgeServerWithEmbeddedLocator(final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) - throws IOException { - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.START_LOCATOR_NAME, locators); - props.setProperty(DistributionConfig.LOCATORS_NAME, locators); - DistributedSystem ds = getSystem(props); - Cache cache = CacheFactory.create(ds); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setEnableBridgeConflation(true); - factory.setDataPolicy(DataPolicy.REPLICATE); - RegionAttributes attrs = factory.create(); - for (int i = 0; i < regions.length; i++) { - cache.createRegion(regions[i], attrs); - } - CacheServer server = cache.addCacheServer(); - server.setGroups(groups); - server.setLoadProbe(probe); - server.setPort(0); - server.start(); - - remoteObjects.put(CACHE_KEY, cache); - - return new Integer(server.getPort()); + + protected int startBridgeServerWithEmbeddedLocator(VM vm, final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) { + SerializableCallable connect = + new SerializableCallable("Start bridge server") { + public Object call() throws IOException { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.START_LOCATOR_NAME, locators); + props.setProperty(DistributionConfig.LOCATORS_NAME, locators); + DistributedSystem ds = getSystem(props); + Cache cache = CacheFactory.create(ds); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setEnableBridgeConflation(true); + factory.setDataPolicy(DataPolicy.REPLICATE); + RegionAttributes attrs = factory.create(); + for(int i = 0; i < regions.length; i++) { + cache.createRegion(regions[i], attrs); + } + CacheServer server = cache.addCacheServer(); + server.setGroups(groups); + server.setLoadProbe(probe); + final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); + server.setPort(serverPort); + server.start(); + + remoteObjects.put(CACHE_KEY, cache); + + return new Integer(serverPort); + } + }; + Integer port = (Integer) vm.invoke(connect); + return port.intValue(); } - - protected void startBridgeClient(final String group, final String host, final int port) throws Exception { - startBridgeClient(group, host, port, new String[] { REGION_NAME }); + + protected void startBridgeClientInVM(VM vm, final String group, final String host, final int port) throws Exception { + startBridgeClientInVM(vm, group, host, port, new String[] {REGION_NAME}); } + - protected void startBridgeClient(final String group, final String host, final int port, final String[] regions) throws Exception { + protected void startBridgeClientInVM(VM vm, final String group, final String host, final int port, final String[] regions) throws Exception { PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addLocator(host, port) - .setServerGroup(group) - .setPingInterval(200) - .setSubscriptionEnabled(true) - .setSubscriptionRedundancy(-1); - startBridgeClient(pf.getPoolAttributes(), regions); + .setServerGroup(group) + .setPingInterval(200) + .setSubscriptionEnabled(true) + .setSubscriptionRedundancy(-1); + startBridgeClientInVM(vm, pf.getPoolAttributes(), regions); } - - protected void startBridgeClient(final Pool pool, final String[] regions) throws Exception { - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.LOCATORS_NAME, ""); - DistributedSystem ds = getSystem(props); - Cache cache = CacheFactory.create(ds); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.LOCAL); - // factory.setEnableBridgeConflation(true); - // factory.setDataPolicy(DataPolicy.NORMAL); - factory.setPoolName(POOL_NAME); - PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory(); - pf.init(pool); - LocatorDiscoveryCallback locatorCallback = new MyLocatorCallback(); - remoteObjects.put(CALLBACK_KEY, locatorCallback); - pf.setLocatorDiscoveryCallback(locatorCallback); - pf.create(POOL_NAME); - - RegionAttributes attrs = factory.create(); - for (int i = 0; i < regions.length; i++) { - cache.createRegion(regions[i], attrs); + + protected void startBridgeClientInVM(VM vm, final Pool pool, final String[] regions) throws Exception { + SerializableRunnable connect = + new SerializableRunnable("Start bridge client") { + public void run() { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.LOCATORS_NAME, ""); + DistributedSystem ds = getSystem(props); + Cache cache = CacheFactory.create(ds); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); +// factory.setEnableBridgeConflation(true); +// factory.setDataPolicy(DataPolicy.NORMAL); + factory.setPoolName(POOL_NAME); + PoolFactoryImpl pf= (PoolFactoryImpl) PoolManager.createFactory(); + pf.init(pool); + LocatorDiscoveryCallback locatorCallback = new MyLocatorCallback(); + remoteObjects.put(CALLBACK_KEY, locatorCallback); + pf.setLocatorDiscoveryCallback(locatorCallback); + pf.create(POOL_NAME); + + + RegionAttributes attrs = factory.create(); + for(int i = 0; i < regions.length; i++) { + cache.createRegion(regions[i], attrs); + } + + remoteObjects.put(CACHE_KEY, cache); + } + }; + + if(vm == null) { + connect.run(); + } else { + vm.invoke(connect); } - - remoteObjects.put(CACHE_KEY, cache); } - + protected void stopBridgeMemberVM(VM vm) { - vm.invoke(new SerializableRunnable("Stop bridge member") { - public void run() { - Cache cache = (Cache) remoteObjects.remove(CACHE_KEY); - cache.close(); - disconnectFromDS(); - } - }); + vm.invoke(new SerializableRunnable("Stop bridge member") { + public void run() { + Cache cache = (Cache) remoteObjects.remove(CACHE_KEY); + cache.close(); + disconnectFromDS(); + } + }); } - + public String getLocatorString(Host host, int locatorPort) { - return getLocatorString(host, new int[] { locatorPort }); + return getLocatorString(host, new int[] {locatorPort}); } - + public String getLocatorString(Host host, int[] locatorPorts) { StringBuffer str = new StringBuffer(); - for (int i = 0; i < locatorPorts.length; i++) { + for(int i = 0; i < locatorPorts.length; i++) { str.append(NetworkUtils.getServerHostName(host)) .append("[") .append(locatorPorts[i]) .append("]"); - if (i < locatorPorts.length - 1) { + if(i < locatorPorts.length - 1) { str.append(","); } } - + return str.toString(); } - + protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter { private final Set discoveredLocators = new HashSet(); private final Set removedLocators = new HashSet(); - + public synchronized void locatorsDiscovered(List locators) { discoveredLocators.addAll(locators); notifyAll(); @@ -266,29 +336,29 @@ public abstract class LocatorTestBase extends DistributedTestCase { removedLocators.addAll(locators); notifyAll(); } - + public boolean waitForDiscovery(InetSocketAddress locator, long time) throws InterruptedException { return waitFor(discoveredLocators, locator, time); } - + public boolean waitForRemove(InetSocketAddress locator, long time) throws InterruptedException { return waitFor(removedLocators, locator, time); } - + private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time) throws InterruptedException { long remaining = time; long endTime = System.currentTimeMillis() + time; - while (!set.contains(locator) && remaining >= 0) { + while(!set.contains(locator) && remaining >= 0) { wait(remaining); - remaining = endTime - System.currentTimeMillis(); + remaining = endTime - System.currentTimeMillis(); } return set.contains(locator); } - + public synchronized Set getDiscovered() { return new HashSet(discoveredLocators); } - + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1a0748b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java index 0f08456..de43c29 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java @@ -56,11 +56,11 @@ public class Bug47667DUnitTest extends LocatorTestBase { final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); final String locatorHost = NetworkUtils.getServerHostName(host); - locator.invoke("Start Locator",() ->startLocator(locator.getHost(), locatorPort, "")); + startLocatorInVM(locator, locatorPort, ""); String locString = getLocatorString(host, locatorPort); - server1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"R1"}, locString, new String[] {"R1"})); - server2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"R2"}, locString, new String[] {"R2"})); + startBridgeServerInVM(server1, new String[] {"R1"}, locString, new String[] {"R1"}); + startBridgeServerInVM(server2, new String[] {"R2"}, locString, new String[] {"R2"}); client.invoke(new SerializableCallable() { @Override @@ -70,15 +70,15 @@ public class Bug47667DUnitTest extends LocatorTestBase { ClientCache cache = ccf.create(); PoolManager.createFactory().addLocator(locatorHost, locatorPort).setServerGroup("R1").create("R1"); PoolManager.createFactory().addLocator(locatorHost, locatorPort).setServerGroup("R2").create("R2"); - Region region1 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R1").create("R1"); - Region region2 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R2").create("R2"); - CacheTransactionManager transactionManager = cache.getCacheTransactionManager(); - transactionManager.begin(); - region1.put(1, "value1"); - transactionManager.commit(); - transactionManager.begin(); - region2.put(2, "value2"); - transactionManager.commit(); + Region r1 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R1").create("R1"); + Region r2 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R2").create("R2"); + CacheTransactionManager mgr = cache.getCacheTransactionManager(); + mgr.begin(); + r1.put(1, "value1"); + mgr.commit(); + mgr.begin(); + r2.put(2, "value2"); + mgr.commit(); return null; } }); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/d1a0748b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java index 167cc3a..d784397 100644 --- a/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java +++ b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java @@ -16,9 +16,26 @@ */ package com.gemstone.gemfire.management; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Properties; + +import javax.management.InstanceNotFoundException; +import javax.management.MBeanServer; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.ObjectName; + import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.client.internal.LocatorTestBase; -import com.gemstone.gemfire.cache.query.*; +import com.gemstone.gemfire.cache.query.IndexExistsException; +import com.gemstone.gemfire.cache.query.IndexInvalidException; +import com.gemstone.gemfire.cache.query.IndexNameConflictException; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.RegionNotFoundException; import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryDUnitTest; import com.gemstone.gemfire.cache.query.internal.cq.CqService; import com.gemstone.gemfire.cache.server.CacheServer; @@ -29,34 +46,37 @@ import com.gemstone.gemfire.internal.AvailablePort; import com.gemstone.gemfire.internal.AvailablePortHelper; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.management.internal.JmxManagerLocatorRequest; +import com.gemstone.gemfire.management.internal.JmxManagerLocatorResponse; import com.gemstone.gemfire.management.internal.MBeanJMXAdapter; import com.gemstone.gemfire.management.internal.SystemManagementService; -import com.gemstone.gemfire.test.dunit.*; - -import javax.management.*; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.Properties; +import com.gemstone.gemfire.test.dunit.Assert; +import com.gemstone.gemfire.test.dunit.Host; +import com.gemstone.gemfire.test.dunit.LogWriterUtils; +import com.gemstone.gemfire.test.dunit.NetworkUtils; +import com.gemstone.gemfire.test.dunit.SerializableRunnable; +import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.Wait; +import com.gemstone.gemfire.test.dunit.WaitCriterion; /** * Cache Server related management test cases + * + * */ public class CacheServerManagementDUnitTest extends LocatorTestBase { private static final long serialVersionUID = 1L; - - private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000; + + private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000; private ManagementTestBase helper; private static final String queryName = "testClientWithFeederAndCQ_0"; private static final String indexName = "testIndex"; - + private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer; + protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest( "CqDataDUnitTest"); @@ -64,7 +84,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { public CacheServerManagementDUnitTest(String name) { super(name); this.helper = new ManagementTestBase(name); - + } @Override @@ -78,6 +98,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { } /** + * * @throws Exception */ public void testCacheServerMBean() throws Exception { @@ -91,11 +112,12 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { helper.startManagingNode(managingNode); //helper.createCache(server); int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); - cqDUnitTest.createServer(server, serverPort); - + cqDUnitTest.createServer(server,serverPort); + + DistributedMember member = helper.getMember(server); - - verifyCacheServer(server, serverPort); + + verifyCacheServer(server,serverPort); final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort()); final String host0 = NetworkUtils.getServerHostName(server.getHost()); @@ -123,10 +145,10 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { // Close. Wait.pause(2000); - checkNavigation(managingNode, member, serverPort); - verifyIndex(server, serverPort); + checkNavigation(managingNode,member,serverPort); + verifyIndex(server,serverPort); // This will test all CQs and will close the cq in its final step - verifyCacheServerRemote(managingNode, member, serverPort); + verifyCacheServerRemote(managingNode, member,serverPort); verifyClosedCQ(server); @@ -140,30 +162,30 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { /** * Test for client server connection related management artifacts - * like notifications - * + * like notifications * @throws Exception */ - + public void testCacheClient() throws Exception { - + final Host host = Host.getHost(0); VM locator = host.getVM(0); VM server = host.getVM(1); VM client = host.getVM(2); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, "")); - - String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]"; - - int serverPort = server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators)); - - addClientNotifListener(server, serverPort); + startLocatorInVM(locator, locatorPort, ""); + + String locators = NetworkUtils.getServerHostName(locator.getHost())+ "[" + locatorPort + "]"; + + + int serverPort = startBridgeServerInVM(server, null, locators); + + addClientNotifListener(server,serverPort); // Start a client and make sure that proper notification is received - client.invoke("Start BridgeClient", () -> startBridgeClient(null, NetworkUtils.getServerHostName(locator.getHost()), locatorPort)); - + startBridgeClientInVM(client, null, NetworkUtils.getServerHostName(locator.getHost()), locatorPort); + //stop the client and make sure the bridge server notifies stopBridgeMemberVM(client); helper.closeCache(locator); @@ -171,14 +193,13 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { helper.closeCache(client); } - + /** * Intention of this test is to check if a node becomes manager after all the nodes are alive * it should have all the information of all the members. - * <p> + * * Thats why used service.getLocalManager().runManagementTaskAdhoc() to make node * ready for federation when manager node comes up - * * @throws Exception */ @@ -187,72 +208,96 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { final Host host = Host.getHost(0); VM locator = host.getVM(0); VM server = host.getVM(1); - + //Step 1: final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, "")); - - String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]"; + startLocator(locator, locatorPort, ""); + String locators = NetworkUtils.getServerHostName(locator.getHost())+ "[" + locatorPort + "]"; + //Step 2: - server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators)); - + int serverPort = startBridgeServerInVM(server, null, locators); + //Step 3: - server.invoke("Check Server", () -> { - Cache cache = GemFireCacheImpl.getInstance(); - assertNotNull(cache); - SystemManagementService service = (SystemManagementService) ManagementService - .getExistingManagementService(cache); - assertNotNull(service); - assertFalse(service.isManager()); - assertNotNull(service.getMemberMXBean()); - service.getLocalManager().runManagementTaskAdhoc(); + server.invoke(new SerializableRunnable("Check Server") { + + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + assertNotNull(cache); + SystemManagementService service = (SystemManagementService)ManagementService + .getExistingManagementService(cache); + assertNotNull(service); + assertFalse(service.isManager()); + assertNotNull(service.getMemberMXBean()); + service.getLocalManager().runManagementTaskAdhoc(); + + + } }); + + //Step 4: + JmxManagerLocatorResponse locRes = JmxManagerLocatorRequest.send(locator + .getHost().getHostName(), locatorPort, CONNECT_LOCATOR_TIMEOUT_MS, Collections.<String, String> emptyMap()); + + //Step 5: + locator.invoke(new SerializableRunnable("Check locator") { - //Step 4: - JmxManagerLocatorRequest.send(locator - .getHost().getHostName(), locatorPort, CONNECT_LOCATOR_TIMEOUT_MS, Collections.<String, String>emptyMap()); - - //Step 5: - locator.invoke("Check locator", () -> { - Cache cache = GemFireCacheImpl.getInstance(); - assertNotNull(cache); - ManagementService service = ManagementService - .getExistingManagementService(cache); - assertNotNull(service); - assertTrue(service.isManager()); - LocatorMXBean bean = service.getLocalLocatorMXBean(); - assertEquals(locatorPort, bean.getPort()); - DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); - - assertEquals(2, dsBean.listMemberObjectNames().length); + public void run() { + Cache cache = GemFireCacheImpl.getInstance(); + assertNotNull(cache); + ManagementService service = ManagementService + .getExistingManagementService(cache); + assertNotNull(service); + assertTrue(service.isManager()); + LocatorMXBean bean = service.getLocalLocatorMXBean(); + assertEquals(locatorPort, bean.getPort()); + DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); + ObjectName[] names = dsBean.listMemberObjectNames(); + + assertEquals(2,dsBean.listMemberObjectNames().length); + + } }); + + helper.closeCache(locator); helper.closeCache(server); - + + } + + + protected void startLocator(final VM vm, final int locatorPort, final String otherLocators) { + vm.invoke(new SerializableRunnable("Create Locator") { - protected void startLocator(Host vmHost, final int locatorPort, final String otherLocators) { - disconnectFromDS(); - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators); - props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); - props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0"); - props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log"); - try { - InetAddress bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost)); - Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props); - remoteObjects.put(LOCATOR_KEY, locator); - } catch (UnknownHostException uhe) { - Assert.fail("While resolving bind address ", uhe); - } catch (IOException ex) { - Assert.fail("While starting locator on port " + locatorPort, ex); - } + final String testName= getUniqueName(); + public void run() { + disconnectFromDS(); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators); + props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); + props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0"); + props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + try { + File logFile = new File(testName + "-locator" + locatorPort + + ".log"); + InetAddress bindAddr = null; + try { + bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vm.getHost())); + } catch (UnknownHostException uhe) { + Assert.fail("While resolving bind address ", uhe); + } + Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props); + remoteObjects.put(LOCATOR_KEY, locator); + } catch (IOException ex) { + Assert.fail("While starting locator on port " + locatorPort, ex); + } + } + }); } - + protected void checkNavigation(final VM vm, final DistributedMember cacheServerMember, final int serverPort) { SerializableRunnable checkNavigation = new SerializableRunnable( @@ -282,14 +327,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { }; vm.invoke(checkNavigation); } - + /** * Verify the Cache Server details - * + * * @param vm */ @SuppressWarnings("serial") - protected void addClientNotifListener(final VM vm, final int serverPort) throws Exception { + protected void addClientNotifListener(final VM vm , final int serverPort) throws Exception { SerializableRunnable addClientNotifListener = new SerializableRunnable( "Add Client Notif Listener") { public void run() { @@ -314,19 +359,18 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { TestCacheServerNotif nt = new TestCacheServerNotif(); try { mbeanServer.addNotificationListener(MBeanJMXAdapter - .getClientServiceMBeanName(serverPort, cache.getDistributedSystem().getMemberId()), nt, null, null); + .getClientServiceMBeanName(serverPort,cache.getDistributedSystem().getMemberId()), nt, null, null); } catch (InstanceNotFoundException e) { fail("Failed With Exception " + e); } - + } }; vm.invoke(addClientNotifListener); } - /** * Verify the closed CQ which is closed from Managing Node - * + * * @param vm */ @SuppressWarnings("serial") @@ -361,7 +405,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { bean.removeIndex(indexName); } catch (Exception e) { fail("Failed With Exception " + e); - + } assertEquals(bean.getIndexCount(), 0); @@ -372,7 +416,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { /** * Verify the closed CQ which is closed from Managing Node - * + * * @param vm */ @SuppressWarnings("serial") @@ -390,9 +434,11 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { vm.invoke(verifyClosedCQ); } + + /** * Verify the Cache Server details - * + * * @param vm */ @SuppressWarnings("serial") @@ -450,7 +496,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { /** * Verify the Cache Server details - * + * * @param vm */ @SuppressWarnings("serial") @@ -476,14 +522,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { LogWriterUtils.getLogWriter().info( "<ExpectedString> Active Query Count " + bean.getActiveCQCount() + "</ExpectedString> "); - + LogWriterUtils.getLogWriter().info( "<ExpectedString> Registered Query Count " + bean.getRegisteredCQCount() + "</ExpectedString> "); - assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1); - int numQueues = bean.getNumSubscriptions(); - assertEquals(numQueues, 1); + assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1); + int numQueues = bean.getNumSubscriptions(); + assertEquals(numQueues, 1); // test for client connection Count /* @TODO */ @@ -506,9 +552,11 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { }; vm.invoke(verifyCacheServerRemote); } - + /** * Notification handler + * + * */ private static class TestCacheServerNotif implements NotificationListener {