http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c33efb60/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java index 9ef87d2..176fbea 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorLoadBalancingDUnitTest.java @@ -16,17 +16,6 @@ */ package com.gemstone.gemfire.cache.client.internal; -import java.io.IOException; -import java.io.Serializable; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; - import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.client.PoolManager; import com.gemstone.gemfire.cache.client.internal.locator.ClientConnectionRequest; @@ -50,24 +39,31 @@ import com.gemstone.gemfire.internal.logging.LocalLogWriter; import com.gemstone.gemfire.test.dunit.Host; import com.gemstone.gemfire.test.dunit.LogWriterUtils; import com.gemstone.gemfire.test.dunit.NetworkUtils; -import com.gemstone.gemfire.test.dunit.SerializableRunnable; -import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.jayway.awaitility.Awaitility; +import org.junit.Assert; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetAddress; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; /** * */ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { - + /** * The number of connections that we can be off by in the balancing tests * We need this little fudge factor, because the locator can receive an update * from the bridge server after it has made incremented its counter for a client * connection, but the client hasn't connected yet. This wipes out the estimation * on the locator. This means that we may be slighly off in our balance. - * + * <p> * TODO grid fix this hole in the locator. */ private static final int ALLOWABLE_ERROR_IN_COUNT = 1; @@ -78,149 +74,144 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { } /** - * Test the locator discovers a bridge server and is initialized with + * Test the locator discovers a bridge server and is initialized with * the correct load for that bridge server. */ public void testDiscovery() { Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); - VM vm2 = host.getVM(2); -// vm0.invoke(new SerializableRunnable() { -// public void run() { -// System.setProperty("gemfire.DistributionAdvisor.VERBOSE", "true"); -// } -// }); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - startLocatorInVM(vm0, locatorPort, ""); - + vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); + String locators = getLocatorString(host, locatorPort); - - int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators); - + + int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); + ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f); ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(vm0 .getHost()), serverPort); Map expected = new HashMap(); expected.put(expectedLocation, expectedLoad); - - checkLocatorLoad(vm0, expected); - - int serverPort2 = startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators); - + + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + + int serverPort2 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); + ServerLocation expectedLocation2 = new ServerLocation(NetworkUtils.getServerHostName(vm0 .getHost()), serverPort2); - + expected.put(expectedLocation2, expectedLoad); - checkLocatorLoad(vm0, expected); + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); } - + /** * Test that the locator will properly estimate the load for servers when - * it receives connection requests. + * it receives connection requests. */ - public void testEstimation() throws UnknownHostException, IOException, ClassNotFoundException { + public void testEstimation() throws IOException, ClassNotFoundException { Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - startLocatorInVM(vm0, locatorPort, ""); + vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); String locators = getLocatorString(host, locatorPort); - - int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators); - - ServerLoad expectedLoad = new ServerLoad(2/800f, 1 / 800.0f, 0f, 1f); + + int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); + + ServerLoad expectedLoad = new ServerLoad(2 / 800f, 1 / 800.0f, 0f, 1f); ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort); Map expected = new HashMap(); expected.put(expectedLocation, expectedLoad); - + ClientConnectionResponse response; response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress - .getByName(NetworkUtils.getServerHostName(host)), locatorPort, + .getByName(NetworkUtils.getServerHostName(host)), locatorPort, new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000); Assert.assertEquals(expectedLocation, response.getServer()); - + response = (ClientConnectionResponse) TcpClient.requestToServer(InetAddress - .getByName(NetworkUtils.getServerHostName(host)), locatorPort, + .getByName(NetworkUtils.getServerHostName(host)), locatorPort, new ClientConnectionRequest(Collections.EMPTY_SET, null), 10000, true); Assert.assertEquals(expectedLocation, response.getServer()); - + //we expect that the connection load load will be 2 * the loadPerConnection - checkLocatorLoad(vm0, expected); - + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + QueueConnectionResponse response2; response2 = (QueueConnectionResponse) TcpClient.requestToServer(InetAddress - .getByName(NetworkUtils.getServerHostName(host)), locatorPort, + .getByName(NetworkUtils.getServerHostName(host)), locatorPort, new QueueConnectionRequest(null, 2, Collections.EMPTY_SET, null, false), 10000, true); Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers()); - + response2 = (QueueConnectionResponse) TcpClient .requestToServer(InetAddress.getByName(NetworkUtils.getServerHostName(host)), locatorPort, new QueueConnectionRequest(null, 5, Collections.EMPTY_SET, null, false), 10000, true); - + Assert.assertEquals(Collections.singletonList(expectedLocation), response2.getServers()); //we expect that the queue load will increase by 2 expectedLoad.setSubscriptionConnectionLoad(2f); - checkLocatorLoad(vm0, expected); + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); } - + /** * Test to make sure the bridge servers communicate * their updated load to the controller when the load * on the bridge server changes. - * @throws Exception + * + * @throws Exception */ public void testLoadMessaging() throws Exception { final Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - startLocatorInVM(vm0, locatorPort, ""); + vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); String locators = getLocatorString(host, locatorPort); - - final int serverPort = startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators); - + + final int serverPort = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); + //We expect 0 load Map expected = new HashMap(); ServerLocation expectedLocation = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort); ServerLoad expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f); expected.put(expectedLocation, expectedLoad); - checkLocatorLoad(vm0, expected); - + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addServer(NetworkUtils.getServerHostName(host), serverPort); pf.setMinConnections(8); pf.setMaxConnections(8); pf.setSubscriptionEnabled(true); - startBridgeClientInVM(vm2, pf.getPoolAttributes(), new String[] {REGION_NAME}); - + vm2.invoke("StartBridgeClient", () -> startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME })); + //We expect 8 client to server connections. The queue requires //an additional client to server connection, but that shouldn't show up here. - expectedLoad = new ServerLoad(8/800f, 1 / 800.0f, 1f, 1f); + expectedLoad = new ServerLoad(8 / 800f, 1 / 800.0f, 1f, 1f); expected.put(expectedLocation, expectedLoad); - - - checkLocatorLoad(vm0, expected); - + + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + stopBridgeMemberVM(vm2); - + //Now we expect 0 load expectedLoad = new ServerLoad(0f, 1 / 800.0f, 0f, 1f); expected.put(expectedLocation, expectedLoad); - checkLocatorLoad(vm0, expected); + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); } - + /** * Test to make sure that the locator * balancing load between two servers. - * @throws Exception + * + * @throws Exception */ public void testBalancing() throws Exception { final Host host = Host.getHost(0); @@ -228,87 +219,60 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); VM vm3 = host.getVM(3); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - startLocatorInVM(vm0, locatorPort, ""); + vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); String locators = getLocatorString(host, locatorPort); - - startBridgeServerInVM(vm1, new String[] {"a", "b"}, locators); - startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators); - + + vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); + vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); + PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort); pf.setMinConnections(80); pf.setMaxConnections(80); pf.setSubscriptionEnabled(false); pf.setIdleTimeout(-1); - startBridgeClientInVM(vm3, pf.getPoolAttributes(), new String[] {REGION_NAME}); - - waitForPrefilledConnections(vm3, 80); - - checkConnectionCount(vm1, 40); - checkConnectionCount(vm2, 40); + vm3.invoke("StartBridgeClient", () -> startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME })); + + vm3.invoke("waitForPrefilledConnections", () -> waitForPrefilledConnections(80)); + + vm1.invoke("check connection count", () -> checkConnectionCount(40)); + vm2.invoke("check connection count", () -> checkConnectionCount(40)); } - private void checkConnectionCount(VM vm, final int count) { - SerializableRunnableIF checkConnectionCount = new SerializableRunnable("checkConnectionCount") { - public void run() { - Cache cache = (Cache) remoteObjects.get(CACHE_KEY); - final CacheServerImpl server = (CacheServerImpl) - cache.getCacheServers().get(0); - WaitCriterion wc = new WaitCriterion() { - String excuse; - public boolean done() { - int sz = server.getAcceptor().getStats() - .getCurrentClientConnections(); - if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) { - return true; - } - excuse = "Found " + sz + " connections, expected " + count; - return false; - } - public String description() { - return excuse; - } - }; - Wait.waitForCriterion(wc, 5 * 60 * 1000, 1000, true); + private void checkConnectionCount(final int count) { + Cache cache = (Cache) remoteObjects.get(CACHE_KEY); + final CacheServerImpl server = (CacheServerImpl) + cache.getCacheServers().get(0); + Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS) + .timeout(300, TimeUnit.SECONDS).until(() -> { + int sz = server.getAcceptor().getStats().getCurrentClientConnections(); + if (Math.abs(sz - count) <= ALLOWABLE_ERROR_IN_COUNT) { + return true; } - }; - - vm.invoke(checkConnectionCount); + System.out.println("Found " + sz + " connections, expected " + count); + return false; + }); } - - private void waitForPrefilledConnections(VM vm, final int count) throws Exception { - waitForPrefilledConnections(vm, count, POOL_NAME); + + private void waitForPrefilledConnections(final int count) throws Exception { + waitForPrefilledConnections(count, POOL_NAME); } - private void waitForPrefilledConnections(VM vm, final int count, final String poolName) throws Exception { - SerializableRunnable runnable = new SerializableRunnable("waitForPrefilledConnections") { - public void run() { - final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName); - WaitCriterion ev = new WaitCriterion() { - public boolean done() { - return pool.getConnectionCount() >= count; - } - public String description() { - return "connection count never reached " + count; - } - }; - Wait.waitForCriterion(ev, MAX_WAIT, 200, true); - } - }; - if(vm == null) { - runnable.run(); - } else { - vm.invoke(runnable); - } + private void waitForPrefilledConnections(final int count, final String poolName) throws Exception { + final PoolImpl pool = (PoolImpl) PoolManager.getAll().get(poolName); + Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS) + .timeout(300, TimeUnit.SECONDS).until(() -> pool.getConnectionCount() >= count); } - - /** Test that the locator balances load between + + /** + * Test that the locator balances load between * three servers with intersecting server groups. * Server: 1 2 3 * Groups: a a,b b - * @throws Exception + * + * @throws Exception */ public void testIntersectingServerGroups() throws Exception { final Host host = Host.getHost(0); @@ -316,175 +280,158 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); VM vm3 = host.getVM(3); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - startLocatorInVM(vm0, locatorPort, ""); + vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); String locators = getLocatorString(host, locatorPort); - - int serverPort1 = startBridgeServerInVM(vm1, new String[] {"a"}, locators); - startBridgeServerInVM(vm2, new String[] {"a", "b"}, locators); - startBridgeServerInVM(vm3, new String[] {"b"}, locators); - + + int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a" }, locators)); + vm2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "a", "b" }, locators)); + vm3.invoke("Start BridgeServer", () -> startBridgeServer(new String[] { "b" }, locators)); + PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort); pf.setMinConnections(12); pf.setSubscriptionEnabled(false); pf.setServerGroup("a"); pf.setIdleTimeout(-1); - startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME}); - waitForPrefilledConnections(null, 12); - - checkConnectionCount(vm1, 6); - checkConnectionCount(vm2, 6); - checkConnectionCount(vm3, 0); - + startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME }); + waitForPrefilledConnections(12); + + vm1.invoke("Check Connection Count", () -> checkConnectionCount(6)); + vm2.invoke("Check Connection Count", () -> checkConnectionCount(6)); + vm3.invoke("Check Connection Count", () -> checkConnectionCount(0)); + LogWriterUtils.getLogWriter().info("pool1 prefilled"); - + PoolFactoryImpl pf2 = (PoolFactoryImpl) PoolManager.createFactory(); pf2.init(pf.getPoolAttributes()); pf2.setServerGroup("b"); - PoolImpl pool2= (PoolImpl) pf2.create("testPool2"); - waitForPrefilledConnections(null, 12, "testPool2"); + PoolImpl pool2 = (PoolImpl) pf2.create("testPool2"); + waitForPrefilledConnections(12, "testPool2"); // The load will not be perfect, because we created all of the connections //for group A first. - checkConnectionCount(vm1, 6); - checkConnectionCount(vm2, 9); - checkConnectionCount(vm3, 9); - + vm1.invoke("Check Connection Count", () -> checkConnectionCount(6)); + vm2.invoke("Check Connection Count", () -> checkConnectionCount(9)); + vm3.invoke("Check Connection Count", () -> checkConnectionCount(9)); + LogWriterUtils.getLogWriter().info("pool2 prefilled"); - + ServerLocation location1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1); PoolImpl pool1 = (PoolImpl) PoolManager.getAll().get(POOL_NAME); Assert.assertEquals("a", pool1.getServerGroup()); - + //Use up all of the pooled connections on pool1, and acquire 3 more - for(int i = 0; i < 15; i++) { + for (int i = 0; i < 15; i++) { pool1.acquireConnection(); } - + LogWriterUtils.getLogWriter().info("aquired 15 connections in pool1"); - + //now the load should be equal - checkConnectionCount(vm1, 9); - checkConnectionCount(vm2, 9); - checkConnectionCount(vm3, 9); - + vm1.invoke("Check Connection Count", () -> checkConnectionCount(9)); + vm2.invoke("Check Connection Count", () -> checkConnectionCount(9)); + vm3.invoke("Check Connection Count", () -> checkConnectionCount(9)); + //use up all of the pooled connections on pool2 - for(int i = 0; i < 12; i++) { + for (int i = 0; i < 12; i++) { pool2.acquireConnection(); } - + LogWriterUtils.getLogWriter().info("aquired 12 connections in pool2"); - + //interleave creating connections in both pools - for(int i = 0; i < 6; i++) { + for (int i = 0; i < 6; i++) { pool1.acquireConnection(); pool2.acquireConnection(); } - + LogWriterUtils.getLogWriter().info("interleaved 6 connections from pool1 with 6 connections from pool2"); - + //The load should still be balanced - checkConnectionCount(vm1, 13); - checkConnectionCount(vm2, 13); - checkConnectionCount(vm3, 13); - + vm1.invoke("Check Connection Count", () -> checkConnectionCount(13)); + vm2.invoke("Check Connection Count", () -> checkConnectionCount(13)); + vm3.invoke("Check Connection Count", () -> checkConnectionCount(13)); + } - + public void testCustomLoadProbe() throws Exception { final Host host = Host.getHost(0); VM vm0 = host.getVM(0); VM vm1 = host.getVM(1); VM vm2 = host.getVM(2); -// VM vm3 = host.getVM(3); - + // VM vm3 = host.getVM(3); + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - startLocatorInVM(vm0, locatorPort, ""); + vm0.invoke("Start Locator", () -> startLocator(vm0.getHost(), locatorPort, "")); String locators = getLocatorString(host, locatorPort); - - ServerLoad load1= new ServerLoad(.3f, .01f, .44f, 4564f); - ServerLoad load2= new ServerLoad(23.2f, 1.1f, 22.3f, .3f); - int serverPort1 = startBridgeServerInVM(vm1, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load1 )); - int serverPort2 = startBridgeServerInVM(vm2, null, locators, new String[] {REGION_NAME}, new MyLoadProbe(load2 )); - + + final ServerLoad load1 = new ServerLoad(.3f, .01f, .44f, 4564f); + final ServerLoad load2 = new ServerLoad(23.2f, 1.1f, 22.3f, .3f); + int serverPort1 = vm1.invoke("Start BridgeServer", () -> startBridgeServer(null, locators, new String[] { REGION_NAME }, new MyLoadProbe(load1))); + int serverPort2 = vm2.invoke("Start BridgeServer", () -> startBridgeServer(null, locators, new String[] { REGION_NAME }, new MyLoadProbe(load2))); + HashMap expected = new HashMap(); ServerLocation l1 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort1); ServerLocation l2 = new ServerLocation(NetworkUtils.getServerHostName(host), serverPort2); expected.put(l1, load1); expected.put(l2, load2); - checkLocatorLoad(vm0, expected); - + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + load1.setConnectionLoad(25f); - changeLoad(vm1, load1); + vm1.invoke("changeLoad", () -> changeLoad(load1)); load2.setSubscriptionConnectionLoad(3.5f); - changeLoad(vm2, load2); - checkLocatorLoad(vm0, expected); - - load1 = new ServerLoad(1f, .1f, 0f, 1f); - load2 = new ServerLoad(2f, 5f, 0f, 2f); - expected.put(l1, load1); - expected.put(l2, load2); - changeLoad(vm1, load1); - changeLoad(vm2, load2); - checkLocatorLoad(vm0, expected); - + vm2.invoke("changeLoad", () -> changeLoad(load2)); + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + + final ServerLoad load1Updated = new ServerLoad(1f, .1f, 0f, 1f); + final ServerLoad load2Updated = new ServerLoad(2f, 5f, 0f, 2f); + expected.put(l1, load1Updated); + expected.put(l2, load2Updated); + vm1.invoke("changeLoad", () -> changeLoad(load1Updated)); + vm2.invoke("changeLoad", () -> changeLoad(load2Updated)); + vm0.invoke("check Locator Load", () -> checkLocatorLoad(expected)); + PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addLocator(NetworkUtils.getServerHostName(host), locatorPort); pf.setMinConnections(20); pf.setSubscriptionEnabled(true); pf.setIdleTimeout(-1); - startBridgeClientInVM(null, pf.getPoolAttributes(), new String[] {REGION_NAME}); - waitForPrefilledConnections(null, 20); - + startBridgeClient(pf.getPoolAttributes(), new String[] { REGION_NAME }); + waitForPrefilledConnections(20); + //The first 10 connection should to go vm1, then 1 to vm2, then another 9 to vm1 //because have unequal values for loadPerConnection - checkConnectionCount(vm1, 19); - checkConnectionCount(vm2, 1); + vm1.invoke("Check Connection Count", () -> checkConnectionCount(19)); + vm2.invoke("Check Connection Count", () -> checkConnectionCount(1)); } - - public void checkLocatorLoad(VM vm, final Map expected) { - vm.invoke(new SerializableRunnable() { - public void run() { - List locators = Locator.getLocators(); - Assert.assertEquals(1, locators.size()); - InternalLocator locator = (InternalLocator) locators.get(0); - final ServerLocator sl = locator.getServerLocatorAdvisee(); - InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out); - sl.getDistributionAdvisor().dumpProfiles("PROFILES= "); - WaitCriterion ev = new WaitCriterion() { - public boolean done() { - return expected.equals(sl.getLoadMap()); - } - public String description() { - return "load map never became equal to " + expected; - } - }; - Wait.waitForCriterion(ev, MAX_WAIT, 200, true); - } - }); + + public void checkLocatorLoad(final Map expected) { + List locators = Locator.getLocators(); + Assert.assertEquals(1, locators.size()); + InternalLocator locator = (InternalLocator) locators.get(0); + final ServerLocator sl = locator.getServerLocatorAdvisee(); + InternalLogWriter log = new LocalLogWriter(InternalLogWriter.FINEST_LEVEL, System.out); + sl.getDistributionAdvisor().dumpProfiles("PROFILES= "); + Awaitility.await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS) + .timeout(300, TimeUnit.SECONDS).until(() -> expected.equals(sl.getLoadMap())); } - - private void changeLoad(VM vm, final ServerLoad newLoad) { - vm.invoke(new SerializableRunnable() { - - public void run() { - Cache cache = (Cache) remoteObjects.get(CACHE_KEY); - CacheServer server = (CacheServer) cache.getCacheServers().get(0); - MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe(); - probe.setLoad(newLoad); - } - - }); + + private void changeLoad(final ServerLoad newLoad) { + Cache cache = (Cache) remoteObjects.get(CACHE_KEY); + CacheServer server = cache.getCacheServers().get(0); + MyLoadProbe probe = (MyLoadProbe) server.getLoadProbe(); + probe.setLoad(newLoad); } - + private static class MyLoadProbe extends ServerLoadProbeAdapter implements Serializable { private ServerLoad load; - + public MyLoadProbe(ServerLoad load) { this.load = load; } - + public ServerLoad getLoad(ServerMetrics metrics) { float connectionLoad = load.getConnectionLoad() + metrics.getConnectionCount() * load.getLoadPerConnection(); @@ -493,7 +440,7 @@ public class LocatorLoadBalancingDUnitTest extends LocatorTestBase { return new ServerLoad(connectionLoad, load.getLoadPerConnection(), queueLoad, load.getLoadPerSubscriptionConnection()); } - + public void setLoad(ServerLoad load) { this.load = load; }
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c33efb60/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java index 2207e1d..af5ba9c 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/cache/client/internal/LocatorTestBase.java @@ -16,52 +16,35 @@ */ package com.gemstone.gemfire.cache.client.internal; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.CacheFactory; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.RegionAttributes; -import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.*; import com.gemstone.gemfire.cache.client.Pool; import com.gemstone.gemfire.cache.client.PoolManager; -import com.gemstone.gemfire.cache.server.ServerLoadProbe; import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.server.ServerLoadProbe; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.gemfire.distributed.Locator; import com.gemstone.gemfire.distributed.internal.DistributionConfig; -import com.gemstone.gemfire.internal.AvailablePortHelper; import com.gemstone.gemfire.internal.cache.PoolFactoryImpl; -import com.gemstone.gemfire.test.dunit.Assert; -import com.gemstone.gemfire.test.dunit.DistributedTestCase; -import com.gemstone.gemfire.test.dunit.Host; -import com.gemstone.gemfire.test.dunit.Invoke; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.NetworkUtils; -import com.gemstone.gemfire.test.dunit.SerializableCallable; -import com.gemstone.gemfire.test.dunit.SerializableRunnable; -import com.gemstone.gemfire.test.dunit.VM; +import com.gemstone.gemfire.test.dunit.*; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; +import java.util.*; /** * */ -public abstract class LocatorTestBase extends DistributedTestCase { +public abstract class LocatorTestBase extends DistributedTestCase { protected static final String CACHE_KEY = "CACHE"; protected static final String LOCATOR_KEY = "LOCATOR"; protected static final String REGION_NAME = "A_REGION"; protected static final String POOL_NAME = "daPool"; protected static final Object CALLBACK_KEY = "callback"; - /** A map for storing temporary objects in a remote VM so that they can be used + /** + * A map for storing temporary objects in a remote VM so that they can be used * between calls. Cleared after each test. */ protected static final HashMap remoteObjects = new HashMap(); @@ -69,264 +52,211 @@ public abstract class LocatorTestBase extends DistributedTestCase { public LocatorTestBase(String name) { super(name); } - + @Override public final void preTearDown() throws Exception { - + SerializableRunnable tearDown = new SerializableRunnable("tearDown") { public void run() { Locator locator = (Locator) remoteObjects.get(LOCATOR_KEY); - if(locator != null) { + if (locator != null) { try { locator.stop(); - } catch(Exception e) { + } catch (Exception e) { //do nothing } } - + Cache cache = (Cache) remoteObjects.get(CACHE_KEY); - if(cache != null) { + if (cache != null) { try { cache.close(); - } catch(Exception e) { + } catch (Exception e) { //do nothing } } remoteObjects.clear(); - + } }; //We seem to like leaving the DS open if we can for //speed, but lets at least destroy our cache and locator. Invoke.invokeInEveryVM(tearDown); tearDown.run(); - + postTearDownLocatorTestBase(); } - + protected void postTearDownLocatorTestBase() throws Exception { } - - protected void startLocatorInVM(final VM vm, final int locatorPort, final String otherLocators) { - vm.invoke(new SerializableRunnable("Create Locator") { - final String testName= getUniqueName(); - public void run() { - disconnectFromDS(); - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators); - props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); - props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - try { - File logFile = new File(testName + "-locator" + locatorPort - + ".log"); - InetAddress bindAddr = null; - try { - bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vm.getHost())); - } catch (UnknownHostException uhe) { - Assert.fail("While resolving bind address ", uhe); - } - Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props); - remoteObjects.put(LOCATOR_KEY, locator); - } catch (IOException ex) { - Assert.fail("While starting locator on port " + locatorPort, ex); - } - } - }); + protected void startLocator(final Host vmHost, final int locatorPort, final String otherLocators) { + disconnectFromDS(); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators); + props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); + props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log"); + try { + InetAddress bindAddress = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost)); + Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddress, props); + remoteObjects.put(LOCATOR_KEY, locator); + } catch (UnknownHostException uhe) { + Assert.fail("While resolving bind address ", uhe); + } catch (IOException ex) { + Assert.fail("While starting locator on port " + locatorPort, ex); + } } - - - - protected void stopLocatorInVM(VM vm) { - vm.invoke(new SerializableRunnable("Stop Locator") { - public void run() { - Locator locator = (Locator) remoteObjects.remove(LOCATOR_KEY); - locator.stop(); - } - }); + + protected void stopLocator() { + Locator locator = (Locator) remoteObjects.remove(LOCATOR_KEY); + locator.stop(); } - - protected int startBridgeServerInVM(VM vm, String[] groups, String locators) { - return startBridgeServerInVM(vm, groups, locators, new String[] {REGION_NAME}); + + protected int startBridgeServer(String[] groups, String locators) throws IOException { + return startBridgeServer(groups, locators, new String[] { REGION_NAME }); } - - protected int addCacheServerInVM(VM vm, final String[] groups) { - SerializableCallable connect = - new SerializableCallable("Add Bridge server") { - public Object call() throws Exception { - Cache cache = (Cache) remoteObjects.get(CACHE_KEY); - CacheServer server = cache.addCacheServer(); - final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); - server.setPort(serverPort); - server.setGroups(groups); - server.start(); - return new Integer(serverPort); - } - }; - Integer port = (Integer) vm.invoke(connect); - return port.intValue(); + protected int addCacheServer(final String[] groups) throws IOException { + Cache cache = (Cache) remoteObjects.get(CACHE_KEY); + CacheServer server = cache.addCacheServer(); + server.setPort(0); + server.setGroups(groups); + server.start(); + return new Integer(server.getPort()); } - - protected int startBridgeServerInVM(VM vm, final String[] groups, final String locators, final String[] regions) { - return startBridgeServerInVM(vm, groups, locators, regions, CacheServer.DEFAULT_LOAD_PROBE); + + protected int startBridgeServer(final String[] groups, final String locators, final String[] regions) throws IOException { + return startBridgeServer(groups, locators, regions, CacheServer.DEFAULT_LOAD_PROBE); } - - protected int startBridgeServerInVM(VM vm, final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) { - SerializableCallable connect = - new SerializableCallable("Start bridge server") { - public Object call() throws IOException { - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.LOCATORS_NAME, locators); - DistributedSystem ds = getSystem(props); - Cache cache = CacheFactory.create(ds); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setEnableBridgeConflation(true); - factory.setDataPolicy(DataPolicy.REPLICATE); - RegionAttributes attrs = factory.create(); - for(int i = 0; i < regions.length; i++) { - cache.createRegion(regions[i], attrs); - } - CacheServer server = cache.addCacheServer(); - final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); - server.setPort(serverPort); - server.setGroups(groups); - server.setLoadProbe(probe); - server.start(); - - remoteObjects.put(CACHE_KEY, cache); - - return new Integer(serverPort); - } - }; - Integer port = (Integer) vm.invoke(connect); - return port.intValue(); + + protected int startBridgeServer(final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) throws IOException { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.LOCATORS_NAME, locators); + DistributedSystem ds = getSystem(props); + Cache cache = CacheFactory.create(ds); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setEnableBridgeConflation(true); + factory.setDataPolicy(DataPolicy.REPLICATE); + RegionAttributes attributes = factory.create(); + for (int i = 0; i < regions.length; i++) { + cache.createRegion(regions[i], attributes); + } + CacheServer server = cache.addCacheServer(); + server.setPort(0); + server.setGroups(groups); + server.setLoadProbe(probe); + server.start(); + + remoteObjects.put(CACHE_KEY, cache); + + return new Integer(server.getPort()); } - - protected int startBridgeServerWithEmbeddedLocator(VM vm, final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) { - SerializableCallable connect = - new SerializableCallable("Start bridge server") { - public Object call() throws IOException { - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.START_LOCATOR_NAME, locators); - props.setProperty(DistributionConfig.LOCATORS_NAME, locators); - DistributedSystem ds = getSystem(props); - Cache cache = CacheFactory.create(ds); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.DISTRIBUTED_ACK); - factory.setEnableBridgeConflation(true); - factory.setDataPolicy(DataPolicy.REPLICATE); - RegionAttributes attrs = factory.create(); - for(int i = 0; i < regions.length; i++) { - cache.createRegion(regions[i], attrs); - } - CacheServer server = cache.addCacheServer(); - server.setGroups(groups); - server.setLoadProbe(probe); - final int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); - server.setPort(serverPort); - server.start(); - - remoteObjects.put(CACHE_KEY, cache); - - return new Integer(serverPort); - } - }; - Integer port = (Integer) vm.invoke(connect); - return port.intValue(); + + protected int startBridgeServerWithEmbeddedLocator(final String[] groups, final String locators, final String[] regions, final ServerLoadProbe probe) + throws IOException { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.START_LOCATOR_NAME, locators); + props.setProperty(DistributionConfig.LOCATORS_NAME, locators); + DistributedSystem ds = getSystem(props); + Cache cache = CacheFactory.create(ds); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setEnableBridgeConflation(true); + factory.setDataPolicy(DataPolicy.REPLICATE); + RegionAttributes attrs = factory.create(); + for (int i = 0; i < regions.length; i++) { + cache.createRegion(regions[i], attrs); + } + CacheServer server = cache.addCacheServer(); + server.setGroups(groups); + server.setLoadProbe(probe); + server.setPort(0); + server.start(); + + remoteObjects.put(CACHE_KEY, cache); + + return new Integer(server.getPort()); } - - protected void startBridgeClientInVM(VM vm, final String group, final String host, final int port) throws Exception { - startBridgeClientInVM(vm, group, host, port, new String[] {REGION_NAME}); + + protected void startBridgeClient(final String group, final String host, final int port) throws Exception { + startBridgeClient(group, host, port, new String[] { REGION_NAME }); } - - protected void startBridgeClientInVM(VM vm, final String group, final String host, final int port, final String[] regions) throws Exception { + protected void startBridgeClient(final String group, final String host, final int port, final String[] regions) throws Exception { PoolFactoryImpl pf = new PoolFactoryImpl(null); pf.addLocator(host, port) - .setServerGroup(group) - .setPingInterval(200) - .setSubscriptionEnabled(true) - .setSubscriptionRedundancy(-1); - startBridgeClientInVM(vm, pf.getPoolAttributes(), regions); + .setServerGroup(group) + .setPingInterval(200) + .setSubscriptionEnabled(true) + .setSubscriptionRedundancy(-1); + startBridgeClient(pf.getPoolAttributes(), regions); } - - protected void startBridgeClientInVM(VM vm, final Pool pool, final String[] regions) throws Exception { - SerializableRunnable connect = - new SerializableRunnable("Start bridge client") { - public void run() { - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.LOCATORS_NAME, ""); - DistributedSystem ds = getSystem(props); - Cache cache = CacheFactory.create(ds); - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.LOCAL); -// factory.setEnableBridgeConflation(true); -// factory.setDataPolicy(DataPolicy.NORMAL); - factory.setPoolName(POOL_NAME); - PoolFactoryImpl pf= (PoolFactoryImpl) PoolManager.createFactory(); - pf.init(pool); - LocatorDiscoveryCallback locatorCallback = new MyLocatorCallback(); - remoteObjects.put(CALLBACK_KEY, locatorCallback); - pf.setLocatorDiscoveryCallback(locatorCallback); - pf.create(POOL_NAME); - - - RegionAttributes attrs = factory.create(); - for(int i = 0; i < regions.length; i++) { - cache.createRegion(regions[i], attrs); - } - - remoteObjects.put(CACHE_KEY, cache); - } - }; - - if(vm == null) { - connect.run(); - } else { - vm.invoke(connect); + + protected void startBridgeClient(final Pool pool, final String[] regions) throws Exception { + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.LOCATORS_NAME, ""); + DistributedSystem ds = getSystem(props); + Cache cache = CacheFactory.create(ds); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + // factory.setEnableBridgeConflation(true); + // factory.setDataPolicy(DataPolicy.NORMAL); + factory.setPoolName(POOL_NAME); + PoolFactoryImpl pf = (PoolFactoryImpl) PoolManager.createFactory(); + pf.init(pool); + LocatorDiscoveryCallback locatorCallback = new MyLocatorCallback(); + remoteObjects.put(CALLBACK_KEY, locatorCallback); + pf.setLocatorDiscoveryCallback(locatorCallback); + pf.create(POOL_NAME); + + RegionAttributes attrs = factory.create(); + for (int i = 0; i < regions.length; i++) { + cache.createRegion(regions[i], attrs); } + + remoteObjects.put(CACHE_KEY, cache); } - + protected void stopBridgeMemberVM(VM vm) { - vm.invoke(new SerializableRunnable("Stop bridge member") { - public void run() { - Cache cache = (Cache) remoteObjects.remove(CACHE_KEY); - cache.close(); - disconnectFromDS(); - } - }); + vm.invoke(new SerializableRunnable("Stop bridge member") { + public void run() { + Cache cache = (Cache) remoteObjects.remove(CACHE_KEY); + cache.close(); + disconnectFromDS(); + } + }); } - + public String getLocatorString(Host host, int locatorPort) { - return getLocatorString(host, new int[] {locatorPort}); + return getLocatorString(host, new int[] { locatorPort }); } - + public String getLocatorString(Host host, int[] locatorPorts) { StringBuffer str = new StringBuffer(); - for(int i = 0; i < locatorPorts.length; i++) { + for (int i = 0; i < locatorPorts.length; i++) { str.append(NetworkUtils.getServerHostName(host)) .append("[") .append(locatorPorts[i]) .append("]"); - if(i < locatorPorts.length - 1) { + if (i < locatorPorts.length - 1) { str.append(","); } } - + return str.toString(); } - + protected static class MyLocatorCallback extends LocatorDiscoveryCallbackAdapter { private final Set discoveredLocators = new HashSet(); private final Set removedLocators = new HashSet(); - + public synchronized void locatorsDiscovered(List locators) { discoveredLocators.addAll(locators); notifyAll(); @@ -336,29 +266,29 @@ public abstract class LocatorTestBase extends DistributedTestCase { removedLocators.addAll(locators); notifyAll(); } - + public boolean waitForDiscovery(InetSocketAddress locator, long time) throws InterruptedException { return waitFor(discoveredLocators, locator, time); } - + public boolean waitForRemove(InetSocketAddress locator, long time) throws InterruptedException { return waitFor(removedLocators, locator, time); } - + private synchronized boolean waitFor(Set set, InetSocketAddress locator, long time) throws InterruptedException { long remaining = time; long endTime = System.currentTimeMillis() + time; - while(!set.contains(locator) && remaining >= 0) { + while (!set.contains(locator) && remaining >= 0) { wait(remaining); - remaining = endTime - System.currentTimeMillis(); + remaining = endTime - System.currentTimeMillis(); } return set.contains(locator); } - + public synchronized Set getDiscovered() { return new HashSet(discoveredLocators); } - + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c33efb60/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java index de43c29..0f08456 100644 --- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java +++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/Bug47667DUnitTest.java @@ -56,11 +56,11 @@ public class Bug47667DUnitTest extends LocatorTestBase { final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); final String locatorHost = NetworkUtils.getServerHostName(host); - startLocatorInVM(locator, locatorPort, ""); + locator.invoke("Start Locator",() ->startLocator(locator.getHost(), locatorPort, "")); String locString = getLocatorString(host, locatorPort); - startBridgeServerInVM(server1, new String[] {"R1"}, locString, new String[] {"R1"}); - startBridgeServerInVM(server2, new String[] {"R2"}, locString, new String[] {"R2"}); + server1.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"R1"}, locString, new String[] {"R1"})); + server2.invoke("Start BridgeServer", () -> startBridgeServer(new String[] {"R2"}, locString, new String[] {"R2"})); client.invoke(new SerializableCallable() { @Override @@ -70,15 +70,15 @@ public class Bug47667DUnitTest extends LocatorTestBase { ClientCache cache = ccf.create(); PoolManager.createFactory().addLocator(locatorHost, locatorPort).setServerGroup("R1").create("R1"); PoolManager.createFactory().addLocator(locatorHost, locatorPort).setServerGroup("R2").create("R2"); - Region r1 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R1").create("R1"); - Region r2 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R2").create("R2"); - CacheTransactionManager mgr = cache.getCacheTransactionManager(); - mgr.begin(); - r1.put(1, "value1"); - mgr.commit(); - mgr.begin(); - r2.put(2, "value2"); - mgr.commit(); + Region region1 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R1").create("R1"); + Region region2 = cache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).setPoolName("R2").create("R2"); + CacheTransactionManager transactionManager = cache.getCacheTransactionManager(); + transactionManager.begin(); + region1.put(1, "value1"); + transactionManager.commit(); + transactionManager.begin(); + region2.put(2, "value2"); + transactionManager.commit(); return null; } }); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c33efb60/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java index d784397..167cc3a 100644 --- a/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java +++ b/geode-cq/src/test/java/com/gemstone/gemfire/management/CacheServerManagementDUnitTest.java @@ -16,26 +16,9 @@ */ package com.gemstone.gemfire.management; -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Collections; -import java.util.Properties; - -import javax.management.InstanceNotFoundException; -import javax.management.MBeanServer; -import javax.management.Notification; -import javax.management.NotificationListener; -import javax.management.ObjectName; - import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.client.internal.LocatorTestBase; -import com.gemstone.gemfire.cache.query.IndexExistsException; -import com.gemstone.gemfire.cache.query.IndexInvalidException; -import com.gemstone.gemfire.cache.query.IndexNameConflictException; -import com.gemstone.gemfire.cache.query.QueryService; -import com.gemstone.gemfire.cache.query.RegionNotFoundException; +import com.gemstone.gemfire.cache.query.*; import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryDUnitTest; import com.gemstone.gemfire.cache.query.internal.cq.CqService; import com.gemstone.gemfire.cache.server.CacheServer; @@ -46,37 +29,34 @@ import com.gemstone.gemfire.internal.AvailablePort; import com.gemstone.gemfire.internal.AvailablePortHelper; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.management.internal.JmxManagerLocatorRequest; -import com.gemstone.gemfire.management.internal.JmxManagerLocatorResponse; import com.gemstone.gemfire.management.internal.MBeanJMXAdapter; import com.gemstone.gemfire.management.internal.SystemManagementService; -import com.gemstone.gemfire.test.dunit.Assert; -import com.gemstone.gemfire.test.dunit.Host; -import com.gemstone.gemfire.test.dunit.LogWriterUtils; -import com.gemstone.gemfire.test.dunit.NetworkUtils; -import com.gemstone.gemfire.test.dunit.SerializableRunnable; -import com.gemstone.gemfire.test.dunit.VM; -import com.gemstone.gemfire.test.dunit.Wait; -import com.gemstone.gemfire.test.dunit.WaitCriterion; +import com.gemstone.gemfire.test.dunit.*; + +import javax.management.*; +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Properties; /** * Cache Server related management test cases - * - * */ public class CacheServerManagementDUnitTest extends LocatorTestBase { private static final long serialVersionUID = 1L; - - private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000; + + private static int CONNECT_LOCATOR_TIMEOUT_MS = 30000; private ManagementTestBase helper; private static final String queryName = "testClientWithFeederAndCQ_0"; private static final String indexName = "testIndex"; - + private static MBeanServer mbeanServer = MBeanJMXAdapter.mbeanServer; - protected CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest( "CqDataDUnitTest"); @@ -84,7 +64,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { public CacheServerManagementDUnitTest(String name) { super(name); this.helper = new ManagementTestBase(name); - + } @Override @@ -98,7 +78,6 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { } /** - * * @throws Exception */ public void testCacheServerMBean() throws Exception { @@ -112,12 +91,11 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { helper.startManagingNode(managingNode); //helper.createCache(server); int serverPort = AvailablePortHelper.getRandomAvailableTCPPort(); - cqDUnitTest.createServer(server,serverPort); - - + cqDUnitTest.createServer(server, serverPort); + DistributedMember member = helper.getMember(server); - - verifyCacheServer(server,serverPort); + + verifyCacheServer(server, serverPort); final int port = server.invoke(() -> CqQueryDUnitTest.getCacheServerPort()); final String host0 = NetworkUtils.getServerHostName(server.getHost()); @@ -145,10 +123,10 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { // Close. Wait.pause(2000); - checkNavigation(managingNode,member,serverPort); - verifyIndex(server,serverPort); + checkNavigation(managingNode, member, serverPort); + verifyIndex(server, serverPort); // This will test all CQs and will close the cq in its final step - verifyCacheServerRemote(managingNode, member,serverPort); + verifyCacheServerRemote(managingNode, member, serverPort); verifyClosedCQ(server); @@ -162,30 +140,30 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { /** * Test for client server connection related management artifacts - * like notifications + * like notifications + * * @throws Exception */ - + public void testCacheClient() throws Exception { - + final Host host = Host.getHost(0); VM locator = host.getVM(0); VM server = host.getVM(1); VM client = host.getVM(2); - + int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - startLocatorInVM(locator, locatorPort, ""); - - String locators = NetworkUtils.getServerHostName(locator.getHost())+ "[" + locatorPort + "]"; - - - int serverPort = startBridgeServerInVM(server, null, locators); - - addClientNotifListener(server,serverPort); + locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, "")); + + String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]"; + + int serverPort = server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators)); + + addClientNotifListener(server, serverPort); // Start a client and make sure that proper notification is received - startBridgeClientInVM(client, null, NetworkUtils.getServerHostName(locator.getHost()), locatorPort); - + client.invoke("Start BridgeClient", () -> startBridgeClient(null, NetworkUtils.getServerHostName(locator.getHost()), locatorPort)); + //stop the client and make sure the bridge server notifies stopBridgeMemberVM(client); helper.closeCache(locator); @@ -193,13 +171,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { helper.closeCache(client); } - + /** * Intention of this test is to check if a node becomes manager after all the nodes are alive * it should have all the information of all the members. - * + * <p> * Thats why used service.getLocalManager().runManagementTaskAdhoc() to make node * ready for federation when manager node comes up + * * @throws Exception */ @@ -208,96 +187,72 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { final Host host = Host.getHost(0); VM locator = host.getVM(0); VM server = host.getVM(1); - + //Step 1: final int locatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); - startLocator(locator, locatorPort, ""); + locator.invoke("Start Locator", () -> startLocator(locator.getHost(), locatorPort, "")); - String locators = NetworkUtils.getServerHostName(locator.getHost())+ "[" + locatorPort + "]"; - - //Step 2: - int serverPort = startBridgeServerInVM(server, null, locators); - - //Step 3: - server.invoke(new SerializableRunnable("Check Server") { - - public void run() { - Cache cache = GemFireCacheImpl.getInstance(); - assertNotNull(cache); - SystemManagementService service = (SystemManagementService)ManagementService - .getExistingManagementService(cache); - assertNotNull(service); - assertFalse(service.isManager()); - assertNotNull(service.getMemberMXBean()); - service.getLocalManager().runManagementTaskAdhoc(); + String locators = NetworkUtils.getServerHostName(locator.getHost()) + "[" + locatorPort + "]"; + //Step 2: + server.invoke("Start BridgeServer", () -> startBridgeServer(null, locators)); - } + //Step 3: + server.invoke("Check Server", () -> { + Cache cache = GemFireCacheImpl.getInstance(); + assertNotNull(cache); + SystemManagementService service = (SystemManagementService) ManagementService + .getExistingManagementService(cache); + assertNotNull(service); + assertFalse(service.isManager()); + assertNotNull(service.getMemberMXBean()); + service.getLocalManager().runManagementTaskAdhoc(); }); - - //Step 4: - JmxManagerLocatorResponse locRes = JmxManagerLocatorRequest.send(locator - .getHost().getHostName(), locatorPort, CONNECT_LOCATOR_TIMEOUT_MS, Collections.<String, String> emptyMap()); - - //Step 5: - locator.invoke(new SerializableRunnable("Check locator") { - public void run() { - Cache cache = GemFireCacheImpl.getInstance(); - assertNotNull(cache); - ManagementService service = ManagementService - .getExistingManagementService(cache); - assertNotNull(service); - assertTrue(service.isManager()); - LocatorMXBean bean = service.getLocalLocatorMXBean(); - assertEquals(locatorPort, bean.getPort()); - DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); - ObjectName[] names = dsBean.listMemberObjectNames(); - - assertEquals(2,dsBean.listMemberObjectNames().length); - - } + //Step 4: + JmxManagerLocatorRequest.send(locator + .getHost().getHostName(), locatorPort, CONNECT_LOCATOR_TIMEOUT_MS, Collections.<String, String>emptyMap()); + + //Step 5: + locator.invoke("Check locator", () -> { + Cache cache = GemFireCacheImpl.getInstance(); + assertNotNull(cache); + ManagementService service = ManagementService + .getExistingManagementService(cache); + assertNotNull(service); + assertTrue(service.isManager()); + LocatorMXBean bean = service.getLocalLocatorMXBean(); + assertEquals(locatorPort, bean.getPort()); + DistributedSystemMXBean dsBean = service.getDistributedSystemMXBean(); + + assertEquals(2, dsBean.listMemberObjectNames().length); }); - - helper.closeCache(locator); helper.closeCache(server); - - + } - - - protected void startLocator(final VM vm, final int locatorPort, final String otherLocators) { - vm.invoke(new SerializableRunnable("Create Locator") { - final String testName= getUniqueName(); - public void run() { - disconnectFromDS(); - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); - props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators); - props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); - props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0"); - props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); - try { - File logFile = new File(testName + "-locator" + locatorPort - + ".log"); - InetAddress bindAddr = null; - try { - bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vm.getHost())); - } catch (UnknownHostException uhe) { - Assert.fail("While resolving bind address ", uhe); - } - Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props); - remoteObjects.put(LOCATOR_KEY, locator); - } catch (IOException ex) { - Assert.fail("While starting locator on port " + locatorPort, ex); - } - } - }); + protected void startLocator(Host vmHost, final int locatorPort, final String otherLocators) { + disconnectFromDS(); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, String.valueOf(0)); + props.setProperty(DistributionConfig.LOCATORS_NAME, otherLocators); + props.setProperty(DistributionConfig.LOG_LEVEL_NAME, LogWriterUtils.getDUnitLogLevel()); + props.setProperty(DistributionConfig.JMX_MANAGER_HTTP_PORT_NAME, "0"); + props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + File logFile = new File(getUniqueName() + "-locator" + locatorPort + ".log"); + try { + InetAddress bindAddr = InetAddress.getByName(NetworkUtils.getServerHostName(vmHost)); + Locator locator = Locator.startLocatorAndDS(locatorPort, logFile, bindAddr, props); + remoteObjects.put(LOCATOR_KEY, locator); + } catch (UnknownHostException uhe) { + Assert.fail("While resolving bind address ", uhe); + } catch (IOException ex) { + Assert.fail("While starting locator on port " + locatorPort, ex); + } } - + protected void checkNavigation(final VM vm, final DistributedMember cacheServerMember, final int serverPort) { SerializableRunnable checkNavigation = new SerializableRunnable( @@ -327,14 +282,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { }; vm.invoke(checkNavigation); } - + /** * Verify the Cache Server details - * + * * @param vm */ @SuppressWarnings("serial") - protected void addClientNotifListener(final VM vm , final int serverPort) throws Exception { + protected void addClientNotifListener(final VM vm, final int serverPort) throws Exception { SerializableRunnable addClientNotifListener = new SerializableRunnable( "Add Client Notif Listener") { public void run() { @@ -359,18 +314,19 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { TestCacheServerNotif nt = new TestCacheServerNotif(); try { mbeanServer.addNotificationListener(MBeanJMXAdapter - .getClientServiceMBeanName(serverPort,cache.getDistributedSystem().getMemberId()), nt, null, null); + .getClientServiceMBeanName(serverPort, cache.getDistributedSystem().getMemberId()), nt, null, null); } catch (InstanceNotFoundException e) { fail("Failed With Exception " + e); } - + } }; vm.invoke(addClientNotifListener); } + /** * Verify the closed CQ which is closed from Managing Node - * + * * @param vm */ @SuppressWarnings("serial") @@ -405,7 +361,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { bean.removeIndex(indexName); } catch (Exception e) { fail("Failed With Exception " + e); - + } assertEquals(bean.getIndexCount(), 0); @@ -416,7 +372,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { /** * Verify the closed CQ which is closed from Managing Node - * + * * @param vm */ @SuppressWarnings("serial") @@ -434,11 +390,9 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { vm.invoke(verifyClosedCQ); } - - /** * Verify the Cache Server details - * + * * @param vm */ @SuppressWarnings("serial") @@ -496,7 +450,7 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { /** * Verify the Cache Server details - * + * * @param vm */ @SuppressWarnings("serial") @@ -522,14 +476,14 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { LogWriterUtils.getLogWriter().info( "<ExpectedString> Active Query Count " + bean.getActiveCQCount() + "</ExpectedString> "); - + LogWriterUtils.getLogWriter().info( "<ExpectedString> Registered Query Count " + bean.getRegisteredCQCount() + "</ExpectedString> "); - assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1); - int numQueues = bean.getNumSubscriptions(); - assertEquals(numQueues, 1); + assertTrue(bean.showAllClientStats()[0].getClientCQCount() == 1); + int numQueues = bean.getNumSubscriptions(); + assertEquals(numQueues, 1); // test for client connection Count /* @TODO */ @@ -552,11 +506,9 @@ public class CacheServerManagementDUnitTest extends LocatorTestBase { }; vm.invoke(verifyCacheServerRemote); } - + /** * Notification handler - * - * */ private static class TestCacheServerNotif implements NotificationListener {