http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/security/ClientPostAuthorizationDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/security/ClientPostAuthorizationDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/ClientPostAuthorizationDUnitTest.java new file mode 100644 index 0000000..91df070 --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/ClientPostAuthorizationDUnitTest.java @@ -0,0 +1,390 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.security; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Random; + +import security.AuthzCredentialGenerator; +import security.CredentialGenerator; +import com.gemstone.gemfire.cache.operations.OperationContext.OperationCode; +import com.gemstone.gemfire.internal.AvailablePort; + +import dunit.Host; + +/** + * Tests for authorization from client to server. This tests for authorization + * with post-process callbacks in case return values of operations and for + * notifications along-with failover. + * + * @author sumedh + * @since 5.5 + */ +public class ClientPostAuthorizationDUnitTest extends + ClientAuthorizationTestBase { + + + /** constructor */ + public ClientPostAuthorizationDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + + super.setUp(); + final Host host = Host.getHost(0); + server1 = host.getVM(0); + server2 = host.getVM(1); + client1 = host.getVM(2); + client2 = host.getVM(3); + + server1.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] { serverExpectedExceptions }); + server2.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] { serverExpectedExceptions }); + client2.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] { clientExpectedExceptions }); + SecurityTestUtil.registerExpectedExceptions(clientExpectedExceptions); + } + + // Region: Tests + + public void testAllPostOps() { + + OperationWithAction[] allOps = { + // Test CREATE and verify with a GET + new OperationWithAction(OperationCode.PUT), + new OperationWithAction(OperationCode.GET, 2, OpFlags.CHECK_NOKEY, 4), + new OperationWithAction(OperationCode.GET, 3, OpFlags.CHECK_NOKEY + | OpFlags.CHECK_NOTAUTHZ, 4), + + // OPBLOCK_END indicates end of an operation block that needs to + // be executed on each server when doing failover + OperationWithAction.OPBLOCK_END, + + // Test UPDATE and verify with a GET + new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN + | OpFlags.USE_NEWVAL, 4), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.USE_NEWVAL, 4), + new OperationWithAction(OperationCode.GET, 3, + OpFlags.USE_OLDCONN | OpFlags.CHECK_NOKEY | OpFlags.USE_NEWVAL + | OpFlags.CHECK_NOTAUTHZ, 4), + + OperationWithAction.OPBLOCK_END, + + // Test UPDATE and verify with a KEY_SET + new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN, 6), + new OperationWithAction(OperationCode.KEY_SET, 2, OpFlags.NONE, 6), + new OperationWithAction(OperationCode.KEY_SET, 3, + OpFlags.CHECK_NOTAUTHZ, 6), + + OperationWithAction.OPBLOCK_END, + + // Test UPDATE and verify with a QUERY + new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN + | OpFlags.USE_NEWVAL, 7), + new OperationWithAction(OperationCode.QUERY, 2, OpFlags.USE_NEWVAL, 7), + new OperationWithAction(OperationCode.QUERY, 3, OpFlags.USE_NEWVAL + | OpFlags.CHECK_NOTAUTHZ, 7), + + OperationWithAction.OPBLOCK_END, + + // Test UPDATE and verify with a EXECUTE_CQ initial results + new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN, 8), + new OperationWithAction(OperationCode.EXECUTE_CQ, 2, OpFlags.NONE, 8), + new OperationWithAction(OperationCode.EXECUTE_CQ, 3, + OpFlags.CHECK_NOTAUTHZ, 8), + + OperationWithAction.OPBLOCK_END }; + + Iterator iter = getDummyGeneratorCombos().iterator(); + while (iter.hasNext()) { + AuthzCredentialGenerator gen = (AuthzCredentialGenerator)iter.next(); + CredentialGenerator cGen = gen.getCredentialGenerator(); + Properties extraAuthProps = cGen.getSystemProperties(); + Properties javaProps = cGen.getJavaProperties(); + Properties extraAuthzProps = gen.getSystemProperties(); + String authenticator = cGen.getAuthenticator(); + String authInit = cGen.getAuthInit(); + String accessor = gen.getAuthorizationCallback(); + TestAuthzCredentialGenerator tgen = new TestAuthzCredentialGenerator(gen); + + getLogWriter().info("testAllPostOps: Using authinit: " + authInit); + getLogWriter().info( + "testAllPostOps: Using authenticator: " + authenticator); + getLogWriter().info("testAllPostOps: Using accessor: " + accessor); + + // Start servers with all required properties + Properties serverProps = buildProperties(authenticator, accessor, true, + extraAuthProps, extraAuthzProps); + // Get ports for the servers + Integer port1 = new Integer(AvailablePort + .getRandomAvailablePort(AvailablePort.SOCKET)); + Integer port2 = new Integer(AvailablePort + .getRandomAvailablePort(AvailablePort.SOCKET)); + + // Close down any running servers + server1.invoke(SecurityTestUtil.class, "closeCache"); + server2.invoke(SecurityTestUtil.class, "closeCache"); + + // Perform all the ops on the clients + List opBlock = new ArrayList(); + Random rnd = new Random(); + for (int opNum = 0; opNum < allOps.length; ++opNum) { + // Start client with valid credentials as specified in + // OperationWithAction + OperationWithAction currentOp = allOps[opNum]; + if (currentOp.equals(OperationWithAction.OPBLOCK_END) + || currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) { + // End of current operation block; execute all the operations + // on the servers with failover + if (opBlock.size() > 0) { + // Start the first server and execute the operation block + server1.invoke(ClientAuthorizationTestBase.class, + "createCacheServer", new Object[] { + SecurityTestUtil.getLocatorPort(), port1, serverProps, + javaProps }); + server2.invoke(SecurityTestUtil.class, "closeCache"); + executeOpBlock(opBlock, port1, port2, authInit, extraAuthProps, + extraAuthzProps, tgen, rnd); + if (!currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) { + // Failover to the second server and run the block again + server2.invoke(ClientAuthorizationTestBase.class, + "createCacheServer", new Object[] { + SecurityTestUtil.getLocatorPort(), port2, serverProps, + javaProps }); + server1.invoke(SecurityTestUtil.class, "closeCache"); + executeOpBlock(opBlock, port1, port2, authInit, extraAuthProps, + extraAuthzProps, tgen, rnd); + } + opBlock.clear(); + } + } + else { + currentOp.setOpNum(opNum); + opBlock.add(currentOp); + } + } + } + } + + public void testAllOpsNotifications() { + + OperationWithAction[] allOps = { + // Test CREATE and verify with a GET + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 2, OpFlags.USE_REGEX + | OpFlags.REGISTER_POLICY_NONE, 8), + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 3, OpFlags.USE_REGEX + | OpFlags.REGISTER_POLICY_NONE | OpFlags.USE_NOTAUTHZ, 8), + new OperationWithAction(OperationCode.PUT), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP, 4), + new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.CHECK_FAIL, 4), + + // OPBLOCK_END indicates end of an operation block that needs to + // be executed on each server when doing failover + OperationWithAction.OPBLOCK_END, + + // Test UPDATE and verify with a GET + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 2, OpFlags.USE_REGEX + | OpFlags.REGISTER_POLICY_NONE, 8), + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 3, OpFlags.USE_REGEX + | OpFlags.REGISTER_POLICY_NONE | OpFlags.USE_NOTAUTHZ, 8), + new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN + | OpFlags.USE_NEWVAL, 4), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.USE_NEWVAL, 4), + new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.USE_NEWVAL | OpFlags.CHECK_FAIL, 4), + + OperationWithAction.OPBLOCK_END, + + // Test DESTROY and verify with GET that keys should not exist + new OperationWithAction(OperationCode.PUT, 3, OpFlags.NONE, 8), + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 2, OpFlags.USE_REGEX, 8), + new OperationWithAction(OperationCode.REGISTER_INTEREST, 3, + OpFlags.USE_REGEX | OpFlags.USE_OLDCONN | OpFlags.REGISTER_POLICY_NONE, 8), + // registerInterest now clears the keys, so a dummy put to add + // those keys back for the case when updates should not come + new OperationWithAction(OperationCode.PUT, 3, OpFlags.USE_OLDCONN, 8), + new OperationWithAction(OperationCode.DESTROY, 1, OpFlags.USE_OLDCONN, + 4), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.CHECK_FAIL, 4), + new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP, 4), + // Repopulate the region + new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN, 8), + + OperationWithAction.OPBLOCK_END, + + // Do REGION_CLEAR and check with GET + new OperationWithAction(OperationCode.PUT, 3, OpFlags.NONE, 8), + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 2, OpFlags.USE_ALL_KEYS, 1), + new OperationWithAction(OperationCode.REGISTER_INTEREST, 3, + OpFlags.USE_ALL_KEYS | OpFlags.USE_OLDCONN | OpFlags.REGISTER_POLICY_NONE, 1), + // registerInterest now clears the keys, so a dummy put to add + // those keys back for the case when updates should not come + new OperationWithAction(OperationCode.PUT, 3, OpFlags.USE_OLDCONN, 8), + new OperationWithAction(OperationCode.REGION_CLEAR, 1, + OpFlags.USE_OLDCONN, 1), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.CHECK_FAIL, 8), + new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP, 8), + // Repopulate the region + new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN, 8), + + OperationWithAction.OPBLOCK_END, + + // Do REGION_CREATE and check with CREATE/GET + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 2, OpFlags.USE_ALL_KEYS | OpFlags.ENABLE_DRF, 1), + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 3, OpFlags.USE_ALL_KEYS | OpFlags.ENABLE_DRF + | OpFlags.USE_NOTAUTHZ | OpFlags.REGISTER_POLICY_NONE, 1), + new OperationWithAction(OperationCode.REGION_CREATE, 1, + OpFlags.ENABLE_DRF, 1), + new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN + | OpFlags.USE_SUBREGION, 4), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.USE_SUBREGION + | OpFlags.NO_CREATE_SUBREGION, 4), + new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.USE_SUBREGION + | OpFlags.NO_CREATE_SUBREGION | OpFlags.CHECK_NOREGION, 4), + + // Do REGION_DESTROY of the sub-region and check with GET + new OperationWithAction(OperationCode.REGION_DESTROY, 1, + OpFlags.USE_OLDCONN | OpFlags.USE_SUBREGION, 1), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.USE_SUBREGION + | OpFlags.NO_CREATE_SUBREGION | OpFlags.CHECK_NOREGION, 4), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.USE_SUBREGION | OpFlags.CHECK_NOKEY + | OpFlags.CHECK_EXCEPTION, 4), + new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.USE_SUBREGION + | OpFlags.NO_CREATE_SUBREGION | OpFlags.CHECK_NOREGION, 4), + new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN + | OpFlags.USE_SUBREGION | OpFlags.CHECK_NOKEY + | OpFlags.CHECK_EXCEPTION, 4), + + OperationWithAction.OPBLOCK_END, + + // Do REGION_DESTROY of the region and check with GET + new OperationWithAction(OperationCode.PUT, 3, OpFlags.NONE, 8), + new OperationWithAction(OperationCode.REGISTER_INTEREST, + OperationCode.GET, 2, OpFlags.USE_ALL_KEYS, 1), + new OperationWithAction(OperationCode.REGISTER_INTEREST, 3, + OpFlags.USE_ALL_KEYS | OpFlags.USE_OLDCONN | OpFlags.REGISTER_POLICY_NONE, 1), + // registerInterest now clears the keys, so a dummy put to add + // those keys back for the case when updates should not come + new OperationWithAction(OperationCode.PUT, 3, OpFlags.USE_OLDCONN, 8), + new OperationWithAction(OperationCode.REGION_DESTROY, 1, OpFlags.NONE, + 1), + new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP | OpFlags.CHECK_NOREGION, 4), + new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN + | OpFlags.LOCAL_OP, 4), + + OperationWithAction.OPBLOCK_NO_FAILOVER }; + + AuthzCredentialGenerator gen = getXmlAuthzGenerator(); + getLogWriter().info("Executing opblocks with credential generator " + gen); + CredentialGenerator cGen = gen.getCredentialGenerator(); + Properties extraAuthProps = cGen.getSystemProperties(); + Properties javaProps = cGen.getJavaProperties(); + Properties extraAuthzProps = gen.getSystemProperties(); + String authenticator = cGen.getAuthenticator(); + String authInit = cGen.getAuthInit(); + String accessor = gen.getAuthorizationCallback(); + TestAuthzCredentialGenerator tgen = new TestAuthzCredentialGenerator(gen); + + getLogWriter().info( + "testAllOpsNotifications: Using authinit: " + authInit); + getLogWriter().info( + "testAllOpsNotifications: Using authenticator: " + authenticator); + getLogWriter().info( + "testAllOpsNotifications: Using accessor: " + accessor); + + // Start servers with all required properties + Properties serverProps = buildProperties(authenticator, accessor, true, + extraAuthProps, extraAuthzProps); + // Get ports for the servers + Integer port1 = new Integer(AvailablePort + .getRandomAvailablePort(AvailablePort.SOCKET)); + Integer port2 = new Integer(AvailablePort + .getRandomAvailablePort(AvailablePort.SOCKET)); + + // Perform all the ops on the clients + List opBlock = new ArrayList(); + Random rnd = new Random(); + for (int opNum = 0; opNum < allOps.length; ++opNum) { + // Start client with valid credentials as specified in + // OperationWithAction + OperationWithAction currentOp = allOps[opNum]; + if (currentOp.equals(OperationWithAction.OPBLOCK_END) + || currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) { + // End of current operation block; execute all the operations + // on the servers with failover + if (opBlock.size() > 0) { + // Start the first server and execute the operation block + server1.invoke(ClientAuthorizationTestBase.class, + "createCacheServer", new Object[] { + SecurityTestUtil.getLocatorPort(), port1, serverProps, + javaProps }); + server2.invoke(SecurityTestUtil.class, "closeCache"); + executeOpBlock(opBlock, port1, port2, authInit, extraAuthProps, + extraAuthzProps, tgen, rnd); + if (!currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) { + // Failover to the second server and run the block again + server2.invoke(ClientAuthorizationTestBase.class, + "createCacheServer", new Object[] { + SecurityTestUtil.getLocatorPort(), port2, serverProps, + javaProps }); + server1.invoke(SecurityTestUtil.class, "closeCache"); + executeOpBlock(opBlock, port1, port2, authInit, extraAuthProps, + extraAuthzProps, tgen, rnd); + } + opBlock.clear(); + } + } + else { + currentOp.setOpNum(opNum); + opBlock.add(currentOp); + } + } + } + + // End Region: Tests + + public void tearDown2() throws Exception { + + super.tearDown2(); + // close the clients first + client1.invoke(SecurityTestUtil.class, "closeCache"); + client2.invoke(SecurityTestUtil.class, "closeCache"); + SecurityTestUtil.closeCache(); + // then close the servers + server1.invoke(SecurityTestUtil.class, "closeCache"); + server2.invoke(SecurityTestUtil.class, "closeCache"); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserAPIDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserAPIDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserAPIDUnitTest.java new file mode 100644 index 0000000..9c5d18a --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserAPIDUnitTest.java @@ -0,0 +1,382 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.security; + +import hydra.Log; + +import java.io.IOException; +import java.util.Properties; + +import javax.net.ssl.SSLException; +import javax.net.ssl.SSLHandshakeException; + +import security.CredentialGenerator; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.client.Pool; +import com.gemstone.gemfire.cache.execute.FunctionService; +import com.gemstone.gemfire.cache.query.CqAttributesFactory; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.PoolManagerImpl; + +import dunit.Host; +import dunit.VM; +import security.DummyCredentialGenerator; + +public class MultiuserAPIDUnitTest extends ClientAuthorizationTestBase { + + /** constructor */ + public MultiuserAPIDUnitTest(String name) { + super(name); + } + + private VM server1 = null; + + private VM server2 = null; + + private VM client1 = null; + + private VM client2 = null; + + private static final String[] serverExpectedExceptions = { + AuthenticationRequiredException.class.getName(), + AuthenticationFailedException.class.getName(), + GemFireSecurityException.class.getName(), + ClassNotFoundException.class.getName(), IOException.class.getName(), + SSLException.class.getName(), SSLHandshakeException.class.getName()}; + + private static final String[] clientExpectedExceptions = { + AuthenticationRequiredException.class.getName(), + AuthenticationFailedException.class.getName(), + SSLHandshakeException.class.getName()}; + + public void setUp() throws Exception { + super.setUp(); + final Host host = Host.getHost(0); + server1 = host.getVM(0); + server2 = host.getVM(1); + client1 = host.getVM(2); + client2 = host.getVM(3); + + server1.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] {serverExpectedExceptions}); + server2.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] {serverExpectedExceptions}); + client1.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] {clientExpectedExceptions}); + client2.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] {clientExpectedExceptions}); + } + + public static Integer createCacheServer(Object dsPort, Object locatorString, + Object authenticator, Object extraProps, Object javaProps) { + + Properties authProps; + if (extraProps == null) { + authProps = new Properties(); + } else { + authProps = (Properties)extraProps; + } + if (authenticator != null) { + authProps.setProperty( + DistributionConfig.SECURITY_CLIENT_AUTHENTICATOR_NAME, authenticator + .toString()); + } + return SecurityTestUtil.createCacheServer(authProps, javaProps, + (Integer)dsPort, (String)locatorString, null, new Integer( + SecurityTestUtil.NO_EXCEPTION)); + } + + private static void createCacheClient(Object authInit, Properties authProps, + Properties javaProps, Integer[] ports, Object numConnections, + Boolean multiUserMode, Integer expectedResult) { + + String authInitStr = (authInit == null ? null : authInit.toString()); + SecurityTestUtil.createCacheClient(authInitStr, authProps, javaProps, + ports, numConnections, multiUserMode.toString(), expectedResult); + } + + public static void createCacheClient(Object authInit, Object authProps, + Object javaProps, Integer port1, Integer port2, Object numConnections, + Boolean multiUserMode, Integer expectedResult) { + + createCacheClient(authInit, (Properties)authProps, (Properties)javaProps, + new Integer[] {port1, port2}, numConnections, multiUserMode, + expectedResult); + } + + public static void registerAllInterest() { + Region region = SecurityTestUtil.getCache().getRegion( + SecurityTestUtil.regionName); + assertNotNull(region); + region.registerInterestRegex(".*"); + } + + private void setUpVMs(CredentialGenerator gen, Boolean multiUser) { + Properties extraProps = gen.getSystemProperties(); + Properties javaProps = gen.getJavaProperties(); + String authenticator = gen.getAuthenticator(); + String authInit = gen.getAuthInit(); + + getLogWriter().info( + "testValidCredentials: Using scheme: " + gen.classCode()); + getLogWriter().info( + "testValidCredentials: Using authenticator: " + authenticator); + getLogWriter().info("testValidCredentials: Using authinit: " + authInit); + + // Start the servers + Integer locPort1 = SecurityTestUtil.getLocatorPort(); + Integer locPort2 = SecurityTestUtil.getLocatorPort(); + String locString = SecurityTestUtil.getLocatorString(); + Integer port1 = (Integer)server1.invoke(MultiuserAPIDUnitTest.class, + "createCacheServer", new Object[] {locPort1, locString, authenticator, + extraProps, javaProps}); + Integer port2 = (Integer)server2.invoke(MultiuserAPIDUnitTest.class, + "createCacheServer", new Object[] {locPort2, locString, authenticator, + extraProps, javaProps}); + + // Start the clients with valid credentials + Properties credentials1 = gen.getValidCredentials(1); + Properties javaProps1 = gen.getJavaProperties(); + getLogWriter().info( + "testValidCredentials: For first client credentials: " + credentials1 + + " : " + javaProps1); + Properties credentials2 = gen.getValidCredentials(2); + Properties javaProps2 = gen.getJavaProperties(); + getLogWriter().info( + "testValidCredentials: For second client credentials: " + credentials2 + + " : " + javaProps2); + client1.invoke(MultiuserAPIDUnitTest.class, "createCacheClient", + new Object[] {authInit, credentials1, javaProps1, port1, port2, null, + multiUser, new Integer(SecurityTestUtil.NO_EXCEPTION)}); + } + + public void testSingleUserUnsupportedAPIs() { + // Start servers + // Start clients with multiuser-authentication set to false + setUpVMs(new DummyCredentialGenerator(), Boolean.FALSE); + client1.invoke(MultiuserAPIDUnitTest.class, "verifyDisallowedOps", + new Object[] {Boolean.FALSE}); + } + + public void testMultiUserUnsupportedAPIs() { + // Start servers. + // Start clients with multiuser-authentication set to true. + setUpVMs(new DummyCredentialGenerator(), Boolean.TRUE); + client1.invoke(MultiuserAPIDUnitTest.class, "verifyDisallowedOps", + new Object[] {Boolean.TRUE}); + } + + public static void verifyDisallowedOps(Boolean multiuserMode) { + String op = "unknown"; + boolean success = false; + if (!multiuserMode) { + success = false; + try { + // Attempt cache.createAuthenticatedCacheView() and expect an exception, fail otherwise + op = "Pool.createSecureUserCache()"; + GemFireCacheImpl.getInstance().createAuthenticatedView(new Properties(), "testPool"); + } catch (IllegalStateException uoe) { + Log.getLogWriter().info(op + ": Got expected exception: " + uoe); + success = true; + } catch (Exception e) { + fail("Got unexpected exception while doing " + op, e); + } + if (!success) { + fail("Did not get exception while doing " + op); + } + } else { // multiuser mode + Region realRegion = GemFireCacheImpl.getInstance().getRegion( + SecurityTestUtil.regionName); + Region proxyRegion = SecurityTestUtil.proxyCaches[0] + .getRegion(SecurityTestUtil.regionName); + Pool pool = PoolManagerImpl.getPMI().find("testPool"); + for (int i = 0; i <= 27; i++) { + success = false; + try { + switch (i) { + // Attempt (real) Region.create/put/get/containsKeyOnServer/destroy/ + // destroyRegion/clear/remove/registerInterest/unregisterInterest() + // and expect an exception, fail otherwise. + case 0: + op = "Region.create()"; + realRegion.create("key", "value"); + break; + case 1: + op = "Region.put()"; + realRegion.put("key", "value"); + break; + case 2: + op = "Region.get()"; + realRegion.get("key"); + break; + case 3: + op = "Region.containsKeyOnServer()"; + realRegion.containsKeyOnServer("key"); + break; + case 4: + op = "Region.remove()"; + realRegion.remove("key"); + break; + case 5: + op = "Region.destroy()"; + realRegion.destroy("key"); + break; + case 6: + op = "Region.destroyRegion()"; + realRegion.destroyRegion(); + break; + case 7: + op = "Region.registerInterest()"; + realRegion.registerInterest("key"); + break; + // case 8: + // op = "Region.unregisterInterest()"; + // realRegion.unregisterInterest("key"); + // break; + case 8: + op = "Region.clear()"; + realRegion.clear(); + break; + // Attempt ProxyRegion.createSubregion/forceRolling/ + // getAttributesMutator/registerInterest/loadSnapShot/saveSnapshot/ + // setUserAttribute/unregisterInterest/writeToDisk + // and expect an exception, fail otherwise. + case 9: + op = "ProxyRegion.createSubregion()"; + proxyRegion.createSubregion("subregion", + null); + break; + case 10: + op = "ProxyRegion.forceRolling()"; + proxyRegion.forceRolling(); + break; + case 11: + op = "ProxyRegion.getAttributesMutator()"; + proxyRegion.getAttributesMutator(); + break; + case 12: + op = "ProxyRegion.registerInterest()"; + proxyRegion.registerInterest("key"); + break; + case 13: + op = "ProxyRegion.loadSnapshot()"; + proxyRegion.loadSnapshot(null); + break; + case 14: + op = "ProxyRegion.saveSnapshot()"; + proxyRegion.saveSnapshot(null); + break; + case 15: + op = "ProxyRegion.setUserAttribute()"; + proxyRegion.setUserAttribute(null); + break; + case 16: + op = "ProxyRegion.unregisterInterestRegex()"; + proxyRegion.unregisterInterestRegex("*"); + break; + // Attempt FunctionService.onRegion/onServer/s(pool) and expect an + // exception, fail otherwise. + case 17: + op = "FunctionService.onRegion()"; + FunctionService.onRegion(realRegion); + break; + case 18: + op = "FunctionService.onServer(pool)"; + FunctionService.onServer(pool); + break; + case 19: + op = "FunctionService.onServers(pool)"; + FunctionService.onServers(pool); + break; + // Attempt + // QueryService.newQuery().execute()/newCq().execute/executeWithInitialResults() + case 20: + op = "QueryService.newQuery.execute()"; + Query query = pool.getQueryService().newQuery( + "SELECT * FROM /" + SecurityTestUtil.regionName); + query.execute(); + break; + case 21: + op = "QueryService.newCq.execute()"; + CqQuery cqQuery = pool.getQueryService().newCq( + "SELECT * FROM /" + SecurityTestUtil.regionName, + new CqAttributesFactory().create()); + try { + cqQuery.execute(); + } catch (CqException ce) { + throw (Exception)ce.getCause(); + } + break; + case 22: + op = "QueryService.newCq.executeWithInitialResults()"; + cqQuery = pool.getQueryService().newCq( + "SELECT * FROM /" + SecurityTestUtil.regionName, + new CqAttributesFactory().create()); + try { + cqQuery.executeWithInitialResults(); + } catch (CqException ce) { + throw (Exception)ce.getCause(); + } + break; + // Attempt ProxyQueryService.getIndex/createIndex/removeIndex() and + // expect an exception, fail otherwise. + case 23: + op = "ProxyQueryService().getIndexes()"; + SecurityTestUtil.proxyCaches[0].getQueryService() + .getIndexes(null); + break; + case 24: + op = "ProxyQueryService().createIndex()"; + SecurityTestUtil.proxyCaches[0].getQueryService().createIndex( + null, null, null ); + break; + case 25: + op = "ProxyQueryService().removeIndexes()"; + SecurityTestUtil.proxyCaches[0].getQueryService().removeIndexes(); + break; + case 26: + op = "ProxyRegion.localDestroy()"; + proxyRegion.localDestroy("key"); + break; + case 27: + op = "ProxyRegion.localInvalidate()"; + proxyRegion.localInvalidate("key"); + break; + default: + fail("Unknown op code: " + i); + break; + } + } catch (UnsupportedOperationException uoe) { + Log.getLogWriter().info(op + ": Got expected exception: " + uoe); + success = true; + } catch (Exception e) { + fail("Got unexpected exception while doing " + op, e); + } + if (!success) { + fail("Did not get exception while doing " + op); + } + } + } + } + + public void tearDown2() throws Exception { + super.tearDown2(); + // close the clients first + client1.invoke(SecurityTestUtil.class, "closeCache"); + client2.invoke(SecurityTestUtil.class, "closeCache"); + // then close the servers + server1.invoke(SecurityTestUtil.class, "closeCache"); + server2.invoke(SecurityTestUtil.class, "closeCache"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserDurableCQAuthzDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserDurableCQAuthzDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserDurableCQAuthzDUnitTest.java new file mode 100644 index 0000000..99c55b1 --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserDurableCQAuthzDUnitTest.java @@ -0,0 +1,484 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.security; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Random; + +import security.AuthzCredentialGenerator; +import security.CredentialGenerator; + +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.operations.OperationContext.OperationCode; +import com.gemstone.gemfire.cache.query.CqAttributes; +import com.gemstone.gemfire.cache.query.CqAttributesFactory; +import com.gemstone.gemfire.cache.query.CqListener; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryTestListener; +import com.gemstone.gemfire.cache.query.internal.cq.ClientCQImpl; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePort; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.logging.InternalLogWriter; + +import dunit.Host; +import dunit.SerializableRunnable; + +/** + * @author ashetkar + * + */ +public class MultiuserDurableCQAuthzDUnitTest extends + ClientAuthorizationTestBase { + + public static final Map<String, String> cqNameToQueryStrings = new HashMap<String, String>(); + + static { + cqNameToQueryStrings.put("CQ_0", "SELECT * FROM "); + cqNameToQueryStrings.put("CQ_1", "SELECT * FROM "); + } + + public MultiuserDurableCQAuthzDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + getSystem(); + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + getSystem(); + } + }); + + final Host host = Host.getHost(0); + server1 = host.getVM(0); + server2 = host.getVM(1); + client1 = host.getVM(2); + client2 = host.getVM(3); + + server1.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] { serverExpectedExceptions }); + server2.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] { serverExpectedExceptions }); + client2.invoke(SecurityTestUtil.class, "registerExpectedExceptions", + new Object[] { clientExpectedExceptions }); + SecurityTestUtil.registerExpectedExceptions(clientExpectedExceptions); + } + + public void testCQForDurableClientsWithDefaultClose() throws Exception { + /* + * 1. Start a server. + * 2. Start a durable client in mulituser secure mode. + * 3. Create two users registering unique durable CQs on server. + * 4. Invoke GemFireCache.close() at client. + * 5. Put some events on server satisfying both the CQs. + * 6. Up the client and the two users. + * 7. Confirm that the users receive the events which were enqueued at server while they were away. + * 8. Same for ProxyCache.close() + */ + Integer numOfUsers = 2; + Integer numOfPuts = 5; + Boolean[] postAuthzAllowed = new Boolean[] {Boolean.TRUE, Boolean.TRUE}; + + doTest(numOfUsers, numOfPuts, postAuthzAllowed, + getXmlAuthzGenerator(), null); + } + + public void testCQForDurableClientsWithCloseKeepAliveTrue() throws Exception { + /* + * 1. Start a server. + * 2. Start a durable client in mulituser secure mode. + * 3. Create two users registering unique durable CQs on server. + * 4. Invoke GemFireCache.close(false) at client. + * 5. Put some events on server satisfying both the CQs. + * 6. Up the client and the two users. + * 7. Observer the behaviour. + * 8. Same for ProxyCache.close(false) + */ + Integer numOfUsers = 2; + Integer numOfPuts = 5; + Boolean[] postAuthzAllowed = new Boolean[] {Boolean.TRUE, Boolean.TRUE}; + + doTest(numOfUsers, numOfPuts, postAuthzAllowed, + getXmlAuthzGenerator(), Boolean.TRUE); + } + + public void testCQForDurableClientsWithCloseKeepAliveFalse() throws Exception { + /* + * 1. Start a server. + * 2. Start a durable client in mulituser secure mode. + * 3. Create two users registering unique durable CQs on server. + * 4. Invoke GemFireCache.close(true) at client. + * 5. Put some events on server satisfying both the CQs. + * 6. Up the client and the two users. + * 7. Observer the behaviour. + * 8. Same for ProxyCache.close(true) + */ + Integer numOfUsers = 2; + Integer numOfPuts = 5; + Boolean[] postAuthzAllowed = new Boolean[] {Boolean.TRUE, Boolean.TRUE}; + + doTest(numOfUsers, numOfPuts, postAuthzAllowed, + getXmlAuthzGenerator(), Boolean.FALSE); + } + + private void doTest(Integer numOfUsers, Integer numOfPuts, + Boolean[] postAuthzAllowed, AuthzCredentialGenerator gen, Boolean keepAlive) + throws Exception { + CredentialGenerator cGen = gen.getCredentialGenerator(); + Properties extraAuthProps = cGen.getSystemProperties(); + Properties javaProps = cGen.getJavaProperties(); + Properties extraAuthzProps = gen.getSystemProperties(); + String authenticator = cGen.getAuthenticator(); + String accessor = gen.getAuthorizationCallback(); + String authInit = cGen.getAuthInit(); + TestAuthzCredentialGenerator tgen = new TestAuthzCredentialGenerator(gen); + + Properties serverProps = buildProperties(authenticator, accessor, true, + extraAuthProps, extraAuthzProps); + + Properties opCredentials; + cGen = tgen.getCredentialGenerator(); + Properties javaProps2 = null; + if (cGen != null) { + javaProps2 = cGen.getJavaProperties(); + } + + int[] indices = new int[numOfPuts]; + for (int index = 0; index < numOfPuts; ++index) { + indices[index] = index; + } + + Random rnd = new Random(); + Properties[] authProps = new Properties[numOfUsers]; + String durableClientId = "multiuser_durable_client_1"; + Properties client2Credentials = null; + for (int i = 0; i < numOfUsers; i++) { + int rand = rnd.nextInt(100) + 1; + if (postAuthzAllowed[i]) { + opCredentials = tgen.getAllowedCredentials(new OperationCode[] { + OperationCode.EXECUTE_CQ, OperationCode.GET}, // For callback, GET should be allowed + new String[] {regionName}, indices, rand); + } else { + opCredentials = tgen.getDisallowedCredentials(new OperationCode[] { + OperationCode.GET}, // For callback, GET should be disallowed + new String[] {regionName}, indices, rand); + } + authProps[i] = SecurityTestUtil.concatProperties(new Properties[] { + opCredentials, extraAuthProps, extraAuthzProps}); + + if (client2Credentials == null) { + client2Credentials = tgen.getAllowedCredentials(new OperationCode[] { + OperationCode.PUT}, + new String[] {regionName}, indices, rand); + } + } + + // Get ports for the servers + Integer port1 = new Integer(AvailablePort + .getRandomAvailablePort(AvailablePort.SOCKET)); + Integer port2 = new Integer(AvailablePort + .getRandomAvailablePort(AvailablePort.SOCKET)); + Integer locatorPort = new Integer(AvailablePort + .getRandomAvailablePort(AvailablePort.SOCKET)); + // Close down any running servers + server1.invoke(SecurityTestUtil.class, "closeCache"); + server2.invoke(SecurityTestUtil.class, "closeCache"); + + server1.invoke(MultiuserDurableCQAuthzDUnitTest.class, + "createServerCache", new Object[] {serverProps, javaProps, locatorPort, port1}); + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, + "createClientCache", new Object[] {javaProps2, authInit, authProps, + new Integer[] {port1, port2}, numOfUsers, durableClientId, postAuthzAllowed}); + +// client2.invoke(SecurityTestUtil.class, "createCacheClient", +// new Object[] {authInit, client2Credentials, javaProps2, +// new Integer[] {port1, port2}, null, SecurityTestUtil.NO_EXCEPTION}); + + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "createCQ", + new Object[] {numOfUsers, Boolean.TRUE}); + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "executeCQ", + new Object[] {numOfUsers, new Boolean[] {false, false}, numOfPuts, + new String[numOfUsers]}); + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "readyForEvents"); + + if (keepAlive == null) { + client1.invoke(SecurityTestUtil.class, "closeCache"); + } else { + client1.invoke(SecurityTestUtil.class, "closeCache", + new Object[] {keepAlive}); + } + + server1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "doPuts", + new Object[] {numOfPuts, Boolean.TRUE/* put last key */}); + + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, + "createClientCache", new Object[] {javaProps2, authInit, authProps, + new Integer[] {port1, port2}, numOfUsers, durableClientId, postAuthzAllowed}); + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "createCQ", + new Object[] {numOfUsers, Boolean.TRUE}); + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "executeCQ", + new Object[] {numOfUsers, new Boolean[] {false, false}, numOfPuts, + new String[numOfUsers]}); + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "readyForEvents"); + + if (!postAuthzAllowed[0] || keepAlive == null || !keepAlive) { + // Don't wait as no user is authorized to receive cq events. + Thread.sleep(1000); + } else { + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "waitForLastKey", + new Object[] {Integer.valueOf(0), Boolean.TRUE}); + } + Integer numOfCreates = (keepAlive == null) ? 0 + : (keepAlive) ? (numOfPuts + 1/* last key */) : 0; + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "checkCQListeners", + new Object[] {numOfUsers, postAuthzAllowed, numOfCreates, 0}); + + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "proxyCacheClose", + new Object[] {new Integer[] {0, 1}, keepAlive}); + + client1.invoke(SecurityTestUtil.class, "createProxyCache", + new Object[] {new Integer[] {0, 1}, authProps}); + + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "createCQ", + new Object[] {numOfUsers, Boolean.TRUE}); + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "executeCQ", + new Object[] {numOfUsers, new Boolean[] {false, false}, numOfPuts, + new String[numOfUsers]}); + + server1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "doPuts", + new Object[] {numOfPuts, Boolean.TRUE/* put last key */}); + + if (!postAuthzAllowed[0] || keepAlive == null || !keepAlive) { + // Don't wait as no user is authorized to receive cq events. + Thread.sleep(1000); + } else { + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "waitForLastKey", + new Object[] {Integer.valueOf(0), Boolean.FALSE}); + } + Integer numOfUpdates = numOfPuts + 1; + client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "checkCQListeners", + new Object[] {numOfUsers, postAuthzAllowed, 0, numOfUpdates}); + } + + public static void createServerCache(Properties serverProps, + Properties javaProps, Integer locatorPort, Integer serverPort) { + SecurityTestUtil.createCacheServer((Properties)serverProps, javaProps, + locatorPort, null, serverPort, Boolean.TRUE, new Integer( + SecurityTestUtil.NO_EXCEPTION)); + } + + public static void createClientCache(Properties javaProps, + String authInit, Properties[] authProps, Integer ports[], + Integer numOfUsers, Boolean[] postAuthzAllowed) { + SecurityTestUtil.createCacheClientForMultiUserMode(numOfUsers, authInit, + authProps, javaProps, ports, null, Boolean.FALSE, + SecurityTestUtil.NO_EXCEPTION); + } + + public static void readyForEvents() { + GemFireCacheImpl.getInstance().readyForEvents(); + } + + public static void createClientCache(Properties javaProps, + String authInit, Properties[] authProps, Integer ports[], + Integer numOfUsers, String durableId, Boolean[] postAuthzAllowed) { + SecurityTestUtil.createCacheClientForMultiUserMode(numOfUsers, authInit, + authProps, javaProps, ports, null, Boolean.FALSE, durableId, + SecurityTestUtil.NO_EXCEPTION); + } + + public static void createCQ(Integer num) { + createCQ(num, false); + } + + public static void createCQ(Integer num, Boolean isDurable) { + for (int i = 0; i < num; i++) { + QueryService cqService = SecurityTestUtil.proxyCaches[i].getQueryService(); + String cqName = "CQ_" + i; + String queryStr = cqNameToQueryStrings.get(cqName) + + SecurityTestUtil.proxyCaches[i].getRegion(regionName).getFullPath(); + // Create CQ Attributes. + CqAttributesFactory cqf = new CqAttributesFactory(); + CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())}; + ((CqQueryTestListener)cqListeners[0]).cqName = cqName; + + cqf.initCqListeners(cqListeners); + CqAttributes cqa = cqf.create(); + + // Create CQ. + try { + CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa, isDurable); + assertTrue("newCq() state mismatch", cq1.getState().isStopped()); + } catch (Exception ex) { + AssertionError err = new AssertionError("Failed to create CQ " + cqName + + " . "); + err.initCause(ex); + getLogWriter().info("CqService is :" + cqService, err); + throw err; + } + } + } + + public static void executeCQ(Integer num, Boolean[] initialResults, + Integer expectedResultsSize, String[] expectedErr) { + InternalLogWriter logWriter = InternalDistributedSystem.getStaticInternalLogWriter(); + for (int i = 0; i < num; i++) { + try { + if (expectedErr[i] != null) { + logWriter.info( + "<ExpectedException action=add>" + expectedErr[i] + + "</ExpectedException>"); + } + CqQuery cq1 = null; + String cqName = "CQ_" + i; + String queryStr = cqNameToQueryStrings.get(cqName) + + SecurityTestUtil.proxyCaches[i].getRegion(regionName) + .getFullPath(); + QueryService cqService = SecurityTestUtil.proxyCaches[i] + .getQueryService(); + + // Get CqQuery object. + try { + cq1 = cqService.getCq(cqName); + if (cq1 == null) { + getLogWriter().info( + "Failed to get CqQuery object for CQ name: " + cqName); + fail("Failed to get CQ " + cqName); + } else { + getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName()); + assertTrue("newCq() state mismatch", cq1.getState().isStopped()); + } + } catch (Exception ex) { + getLogWriter().info("CqService is :" + cqService); + getLogWriter().error(ex); + AssertionError err = new AssertionError("Failed to execute CQ " + + cqName); + err.initCause(ex); + throw err; + } + + if (initialResults[i]) { + SelectResults cqResults = null; + + try { + cqResults = cq1.executeWithInitialResults(); + } catch (Exception ex) { + getLogWriter().info("CqService is: " + cqService); + ex.printStackTrace(); + AssertionError err = new AssertionError("Failed to execute CQ " + + cqName); + err.initCause(ex); + throw err; + } + getLogWriter().info("initial result size = " + cqResults.size()); + assertTrue("executeWithInitialResults() state mismatch", cq1 + .getState().isRunning()); + if (expectedResultsSize >= 0) { + assertEquals("unexpected results size", expectedResultsSize + .intValue(), cqResults.size()); + } + } else { + try { + cq1.execute(); + } catch (Exception ex) { + AssertionError err = new AssertionError("Failed to execute CQ " + + cqName); + err.initCause(ex); + if (expectedErr == null) { + getLogWriter().info("CqService is: " + cqService, err); + } + throw err; + } + assertTrue("execute() state mismatch", cq1.getState().isRunning()); + } + } finally { + if (expectedErr[i] != null) { + logWriter.info( + "<ExpectedException action=remove>" + expectedErr[i] + + "</ExpectedException>"); + } + } + } + } + + public static void doPuts(Integer num, Boolean putLastKey) { + Region region = GemFireCacheImpl.getInstance().getRegion(regionName); + for (int i = 0; i < num; i++) { + region.put("CQ_key"+i, "CQ_value"+i); + } + if (putLastKey) { + region.put("LAST_KEY", "LAST_KEY"); + } + } + + public static void putLastKey() { + Region region = GemFireCacheImpl.getInstance().getRegion(regionName); + region.put("LAST_KEY", "LAST_KEY"); + } + + public static void waitForLastKey(Integer cqIndex, Boolean isCreate) { + String cqName = "CQ_" + cqIndex; + QueryService qService = SecurityTestUtil.proxyCaches[cqIndex].getQueryService(); + ClientCQImpl cqQuery = (ClientCQImpl)qService.getCq(cqName); + if (isCreate) { + ((CqQueryTestListener)cqQuery.getCqListeners()[cqIndex]) + .waitForCreated("LAST_KEY"); + } else { + ((CqQueryTestListener)cqQuery.getCqListeners()[cqIndex]) + .waitForUpdated("LAST_KEY"); + } + } + + public static void checkCQListeners(Integer numOfUsers, + Boolean[] expectedListenerInvocation, Integer createEventsSize, + Integer updateEventsSize) { + for (int i = 0; i < numOfUsers; i++) { + String cqName = "CQ_" + i; + QueryService qService = SecurityTestUtil.proxyCaches[i].getQueryService(); + ClientCQImpl cqQuery = (ClientCQImpl)qService.getCq(cqName); + if (expectedListenerInvocation[i]) { + for (CqListener listener : cqQuery.getCqListeners()) { + assertEquals(createEventsSize.intValue(), + ((CqQueryTestListener)listener).getCreateEventCount()); + assertEquals(updateEventsSize.intValue(), + ((CqQueryTestListener)listener).getUpdateEventCount()); + } + } else { + for (CqListener listener : cqQuery.getCqListeners()) { + assertEquals(0, ((CqQueryTestListener)listener).getTotalEventCount()); + } + } + } + } + + public static void proxyCacheClose(Integer[] userIndices) { + proxyCacheClose(userIndices, null); + } + + public static void proxyCacheClose(Integer[] userIndices, Boolean keepAliveFlags) { + if (keepAliveFlags != null) { + for (int i : userIndices) { + SecurityTestUtil.proxyCaches[i].close(keepAliveFlags); + } + } else { + for (int i : userIndices) { + SecurityTestUtil.proxyCaches[i].close(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/.keepme ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/.keepme b/gemfire-cq/src/test/resources/.keepme new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/excludedClasses.txt ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/excludedClasses.txt b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/excludedClasses.txt new file mode 100644 index 0000000..c55df1d --- /dev/null +++ b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/excludedClasses.txt @@ -0,0 +1,2 @@ +# e.g., +#com/gemstone/gemfire/cache/query/CqAttributesFactory http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/openBugs.txt ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/openBugs.txt b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/openBugs.txt new file mode 100644 index 0000000..6c40e24 --- /dev/null +++ b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/openBugs.txt @@ -0,0 +1,21 @@ +# This is a list of classes excluded due to open bugs about their having +# incompatible changes. There should be no entries in this file at the +# time of a product release. + +# Each entry should be a bug number followed by a comma and the +# full class name. The package components can be delimited with a period +# or a comma. Don't include ".class" or ".java" at the end of the name. + +# example: 50174,com/gemstone/gemfire/distributed/internal/StartupResponseWithVersionMessage +# example: 50175,com.gemstone.org.jgroups.Message$Header + + +# ~~~~~~~~~~~~~~~~~~~ DataSerializables ~~~~~~~~~~~~~~~~~~~~~~~~~ +# these are failures from testDataSerializables + + +# ~~~~~~~~~~~~~~~~~~~ Serializables ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# these are failures from testSerializables + + + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt new file mode 100644 index 0000000..eb9d8d6 --- /dev/null +++ b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt @@ -0,0 +1,4 @@ +com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl,2 +fromData,64,2ab40046594dc22ab400462bb80099b6009ab6007f2cc3a700084e2cc32dbf2a2bb8009bb6009cb5009d2a2bb8009eb5005d2a2bb9009f0100b800a0b50008b1 +toData,47,2ab40046b60047b800a12bb800a22ab4009db8007c2bb800a32ab4005d2bb800a42b2ab40008b600a5b900a60300b1 + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt new file mode 100755 index 0000000..ddd9718 --- /dev/null +++ b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt @@ -0,0 +1 @@ +com/gemstone/gemfire/cache/query/internal/cq/CqConflatable,true,-7215022132135862557,conflate:boolean,id:com/gemstone/gemfire/internal/cache/EventID,key:java/lang/Object,regionname:java/lang/String,value:java/lang/Object http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-client-cache.xml ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-client-cache.xml b/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-client-cache.xml new file mode 100644 index 0000000..afde78f --- /dev/null +++ b/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-client-cache.xml @@ -0,0 +1,21 @@ +<?xml version="1.0"?> + <!DOCTYPE client-cache PUBLIC + "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.6//EN" + "http://www.gemstone.com/dtd/cache6_6.dtd"> + + +<client-cache> + <pool + name="client" + subscription-enabled="true" + load-conditioning-interval="60000" + read-timeout="30000" retry-attempts="5" > + <server host="localhost" port="10188" /> + </pool> + + <region name="testReadyForEventsNotCalledImplicitlyWithCacheXML_region" > + <region-attributes id="testReadyForEventsNotCalledImplicitlyWithCacheXML_region" statistics-enabled="true" pool-name="client" refid="PROXY"> + <cache-listener><class-name>com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil$ControlListener</class-name></cache-listener> + </region-attributes> + </region> +</client-cache> http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-server-cache.xml ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-server-cache.xml b/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-server-cache.xml new file mode 100644 index 0000000..2d58506 --- /dev/null +++ b/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-server-cache.xml @@ -0,0 +1,16 @@ +<?xml version="1.0"?> +<!DOCTYPE cache PUBLIC + "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.6//EN" + "http://www.gemstone.com/dtd/cache6_6.dtd"> + +<cache is-server="true"> + <!-- Define this cache server --> + <cache-server port="10188" > + <client-subscription eviction-policy="entry" capacity="1000"/> + </cache-server> + <region name="testReadyForEventsNotCalledImplicitlyWithCacheXML_region"> + <region-attributes data-policy="partition" enable-subscription-conflation="false"> + <partition-attributes redundant-copies="0" total-num-buckets="113"/> + </region-attributes> + </region> +</cache> http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/cq/statistics/cqStats.bt ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/cq/statistics/cqStats.bt b/gemfire-cq/src/test/resources/cq/statistics/cqStats.bt new file mode 100644 index 0000000..ea6685c --- /dev/null +++ b/gemfire-cq/src/test/resources/cq/statistics/cqStats.bt @@ -0,0 +1,3 @@ +cq/statistics/cqStatsHCT.conf +edgeHosts =1 edgeVMsPerHost = 1 edgeThreadsPerVM = 1 +bridgeHosts =1 bridgeVMsPerHost = 1 bridgeThreadsPerVM = 1 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/cq/statistics/cqStatsHCT.conf ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/resources/cq/statistics/cqStatsHCT.conf b/gemfire-cq/src/test/resources/cq/statistics/cqStatsHCT.conf new file mode 100644 index 0000000..4623a5d --- /dev/null +++ b/gemfire-cq/src/test/resources/cq/statistics/cqStatsHCT.conf @@ -0,0 +1,100 @@ +hydra.Prms-testRequirement = "Test CQ with a variety of operations with careful validation and serial execution in a bridge configuration"; +hydra.Prms-testDescription = " +This test executes operations on entries on a region and carefully +validates for correctness in the CQs. +The test uses serial round robin; the first thread +in the round does a random operation, then all other threads in the round +verify their view of the operation. After the last thread in the round verifies, +it then becomes the first in the next round, thus the thread doing the random +operation changes for each round. +"; + +INCLUDE $JTESTS/hydraconfig/hydraparams1.inc; +INCLUDE $JTESTS/hydraconfig/topology_hct.inc; + +hydra.VmPrms-extraClassPaths = + fcn "hydra.TestConfigFcns.duplicate(\"$GEMFIRE/lib/antlr.jar\", ${bridgeHosts})" ncf; + +THREADGROUP bridgeThreads + totalThreads = fcn + ${bridgeHosts} * ${bridgeVMsPerHost} * ${bridgeThreadsPerVM} + ncf + clientNames = fcn "hydra.TestConfigFcns.generateNames + (\"bridge\", ${bridgeHosts}, true)" + ncf; +THREADGROUP edgeThreads + totalThreads = fcn + ${edgeHosts} * ${edgeVMsPerHost} * ${edgeThreadsPerVM} + ncf + clientNames = fcn "hydra.TestConfigFcns.generateNames + (\"edge\", ${edgeHosts}, true)" + ncf; + +INITTASK taskClass = cq.statistics.CQStatisticsTest taskMethod = HydraTask_initializeBridgeServer + threadGroups = bridgeThreads + runMode = always; + +INITTASK taskClass = cq.statistics.CQStatisticsTest taskMethod = HydraTask_initializeClient + threadGroups = edgeThreads + runMode = always; + +INITTASK taskClass = cq.statistics.CQStatisticsTest taskMethod = HydraTask_verifyCqStatistics + threadGroups = edgeThreads; + +hydra.GemFirePrms-stopSystemsAfterTest = true; + +hydra.Prms-totalTaskTimeSec = 1200; +hydra.Prms-maxResultWaitSec = 600; +hydra.Prms-serialExecution = true; +hydra.Prms-roundRobin = true; + +INCLUDE $JTESTS/util/randomValues.inc; +util.RandomValuesPrms-objectType = byte[]; +util.RandomValuesPrms-elementSize = 1000; +util.RandomValuesPrms-borderCasePercentage = 0; +util.ValueHolderPrms-useExtraObject = true; +hydra.GemFirePrms-conserveSockets = ONEOF true false FOENO; + +hydra.CachePrms-names = cache1; +hydra.CachePrms-searchTimeout = 600; + +hydra.Prms-useFixedRandomInMaster= true; +hydra.RegionPrms-names = clientRegion serverRegion; +hydra.RegionPrms-regionName = testRegion testRegion; +hydra.RegionPrms-scope = local ack; +hydra.RegionPrms-poolName = edgeDescript none; +hydra.RegionPrms-dataPolicy = normal replicate; +hydra.RegionPrms-cacheListeners = util.SilenceListener, hct.BridgeEventListener; +hydra.RegionPrms-partitionName = none none; + +cq.CQUtilPrms-numOpsPerTask = 100; + +util.CachePrms-useDeclarativeXmlFile = ONEOF true false FOENO; +util.TestHelperPrms-minTaskGranularitySec = 30; + +cq.CQUtilPrms-serverEntryOperations = ONEOF add add update invalidate get destroy FOENO; +cq.CQUtilPrms-clientEntryOperations = ONEOF add add update get destroy FOENO; +cq.CQUtilPrms-upperThreshold = 500; +cq.CQUtilPrms-upperThresholdServerOperations = ONEOF destroy FOENO; +cq.CQUtilPrms-upperThresholdClientOperations = ONEOF destroy FOENO; +cq.CQUtilPrms-lowerThreshold = 0; +cq.CQUtilPrms-lowerThresholdServerOperations = add; +cq.CQUtilPrms-lowerThresholdClientOperations = add; +cq.CQUtilPrms-queryDepth = 7; +cq.CQUtilPrms-numQueriesPerClientVM = 100; +cq.CQUtilPrms-QueryServicePoolName = qservice; +cq.CQUtilPrms-QueryServiceUsingPool = ONEOF true false FOENO; + +// define the edge clients +hydra.PoolPrms-names = edgeDescript qservice; +hydra.PoolPrms-minConnections = 2; +hydra.PoolPrms-subscriptionEnabled = true; +hydra.PoolPrms-threadLocalConnections = true; +hydra.PoolPrms-readTimeout = 800000; +hydra.PoolPrms-subscriptionRedundancy = RANGE 0 + fcn ${bridgeHosts} * ${bridgeVMsPerHost} ncf + EGNAR; + +// define the bridge servers +hydra.BridgePrms-names = bridge; + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/build.gradle ---------------------------------------------------------------------- diff --git a/gemfire-wan/build.gradle b/gemfire-wan/build.gradle new file mode 100644 index 0000000..eba2539 --- /dev/null +++ b/gemfire-wan/build.gradle @@ -0,0 +1,5 @@ +dependencies { + provided project(':gemfire-core') + provided project(path: ':gemfire-core', configuration: 'testOutput') + provided project(path: ':gemfire-junit', configuration: 'testOutput') +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/.keepme ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/.keepme b/gemfire-wan/src/main/java/.keepme new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java new file mode 100644 index 0000000..15be615 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java @@ -0,0 +1,304 @@ +/*========================================================================= + * Copyright (c) 2002-2014, Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal; + +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.internal.Version; +import com.gemstone.gemfire.internal.cache.EventID; +import com.gemstone.gemfire.internal.cache.tier.MessageType; +import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage; +import com.gemstone.gemfire.internal.cache.tier.sockets.Message; +import com.gemstone.gemfire.internal.cache.tier.sockets.Part; +import com.gemstone.gemfire.internal.cache.wan.BatchException70; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher; +import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher.GatewayAck; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; + +import java.net.SocketTimeoutException; +import java.util.Iterator; +import java.util.List; + +import org.apache.logging.log4j.Logger; + +public class GatewaySenderBatchOp { + + private static final Logger logger = LogService.getLogger(); + + /** + * Send a list of gateway events to a server to execute + * using connections from the given pool + * to communicate with the server. + * @param con the connection to send the message on. + * @param pool the pool to use to communicate with the server. + * @param events list of gateway events + * @param batchId the ID of this batch + * @param removeFromQueueOnException true if the events should be processed even after some exception + */ + public static void executeOn(Connection con, ExecutablePool pool, List events, int batchId, boolean removeFromQueueOnException, boolean isRetry) + { + AbstractOp op = null; + //System.out.println("Version: "+con.getWanSiteVersion()); + if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) { + op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException, con.getDistributedSystemId(), isRetry); + } else { + // Default should create a batch of server version (ACCEPTOR.VERSION) + op = new GatewaySenderGFEBatchOpImpl(events, batchId, removeFromQueueOnException, con.getDistributedSystemId(), isRetry); + } + pool.executeOn(con, op, true/*timeoutFatal*/); + } + + + public static Object executeOn(Connection con, ExecutablePool pool) + { + AbstractOp op = null; + //System.out.println("Version: "+con.getWanSiteVersion()); + // [sumedh] both cases are now same; why switch-case? + if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) { + op = new GatewaySenderGFEBatchOpImpl(); + } else { + // Default should create a batch of server version (ACCEPTOR.VERSION) + op = new GatewaySenderGFEBatchOpImpl(); + } + return pool.executeOn(con, op, true/*timeoutFatal*/); + } + + private GatewaySenderBatchOp() { + // no instances allowed + } + + static class GatewaySenderGFEBatchOpImpl extends AbstractOp { + + /** + * @throws com.gemstone.gemfire.SerializationException if serialization fails + */ + public GatewaySenderGFEBatchOpImpl(List events, int batchId, boolean removeFromQueueOnException, int dsId, boolean isRetry) { + super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events)); + if (isRetry) { + getMessage().setIsRetry(); + } + getMessage().addIntPart(events.size()); + getMessage().addIntPart(batchId); + getMessage().addIntPart(dsId); + getMessage().addBytesPart( + new byte[] { removeFromQueueOnException ? (byte)1 : (byte)0 }); + // Add each event + for (Iterator i = events.iterator(); i.hasNext();) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next(); + // Add action + int action = event.getAction(); + getMessage().addIntPart(action); + { // Add posDup flag + byte posDupByte = (byte)(event.getPossibleDuplicate()?0x01:0x00); + getMessage().addBytesPart(new byte[] {posDupByte}); + } + if (action >= 0 && action <= 3) { + // 0 = create + // 1 = update + // 2 = destroy + String regionName = event.getRegionPath(); + EventID eventId = event.getEventId(); + Object key = event.getKey(); + Object callbackArg = event.getSenderCallbackArgument(); + + // Add region name + getMessage().addStringPart(regionName); + // Add event id + getMessage().addObjPart(eventId); + // Add key + getMessage().addStringOrObjPart(key); + if (action < 2 /* it is 0 or 1 */) { + byte[] value = event.getSerializedValue(); + byte valueIsObject = event.getValueIsObject();; + // Add value (which is already a serialized byte[]) + getMessage().addRawPart(value, (valueIsObject == 0x01)); + } + // Add callback arg if necessary + if (callbackArg == null) { + getMessage().addBytesPart(new byte[] {0x00}); + } else { + getMessage().addBytesPart(new byte[] {0x01}); + getMessage().addObjPart(callbackArg); + } + getMessage().addLongPart(event.getVersionTimeStamp()); + } + } + } + + public GatewaySenderGFEBatchOpImpl() { + super(MessageType.GATEWAY_RECEIVER_COMMAND, 0); + } + + @Override + public Object attempt(Connection cnx) throws Exception { + if (getMessage().getNumberOfParts() == 0) { + return attemptRead(cnx); + } + this.failed = true; + this.timedOut = false; + long start = startAttempt(cnx.getStats()); + try { + try { + attemptSend(cnx); + this.failed = false; + } finally { + endSendAttempt(cnx.getStats(), start); + } + } finally { + endAttempt(cnx.getStats(), start); + } + return this.failed; + } + + private Object attemptRead(Connection cnx) throws Exception { + this.failed = true; + try { + Object result = attemptReadResponse(cnx); + this.failed = false; + return result; + } catch (SocketTimeoutException ste) { + this.failed = false; + this.timedOut = true; + throw ste; + } catch (Exception e) { + throw e; + } + } + + + /** + * Attempts to read a response to this operation by reading it from the + * given connection, and returning it. + * @param cnx the connection to read the response from + * @return the result of the operation + * or <code>null</code> if the operation has no result. + * @throws Exception if the execute failed + */ + protected Object attemptReadResponse(Connection cnx) throws Exception { + Message msg = createResponseMessage(); + if (msg != null) { + msg.setComms(cnx.getSocket(), cnx.getInputStream(), + cnx.getOutputStream(), + ((ConnectionImpl)cnx).getCommBufferForAsyncRead(), cnx.getStats()); + if (msg instanceof ChunkedMessage) { + try { + return processResponse(msg, cnx); + } finally { + msg.unsetComms(); + // TODO (ashetkar) Handle the case when we fail to read the + // connection id. + processSecureBytes(cnx, msg); + } + } + + try { + msg.recv(); + } finally { + msg.unsetComms(); + processSecureBytes(cnx, msg); + } + return processResponse(msg, cnx); + } + + return null; + } + + + private static int calcPartCount(List events) { + int numberOfParts = 4; // for the number of events and the batchId + for (Iterator i = events.iterator(); i.hasNext();) { + GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next(); + numberOfParts += event.getNumberOfParts(); + } + return numberOfParts; + } + + @Override + protected void processSecureBytes(Connection cnx, Message message) + throws Exception { + } + + @Override + protected boolean needsUserId() { + return false; + } + + @Override + protected void sendMessage(Connection cnx) throws Exception { + getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & Message.MESSAGE_HAS_SECURE_PART)); + getMessage().send(false); + } + + @Override + protected Object processResponse(Message msg) throws Exception { + GatewayAck ack = null; + try { + // Read the header which describes the type of message following + switch (msg.getMessageType()) { + case MessageType.REPLY: + // Read the chunk + int batchId = msg.getPart(0).getInt(); + int numEvents = msg.getPart(1).getInt(); + ack = new GatewayAck(batchId, numEvents); + break; + case MessageType.EXCEPTION: + Part part0 = msg.getPart(0); + + Object obj = part0.getObject(); + if (obj instanceof List) { + List<BatchException70> l = (List<BatchException70>)part0.getObject(); + + if (logger.isDebugEnabled()) { + logger.debug("We got an exception from the GatewayReceiver. MessageType : {} obj :{}", msg.getMessageType(), obj); + } + // don't throw Exception but set it in the Ack + BatchException70 be = new BatchException70(l); + ack = new GatewayAck(be, l.get(0).getBatchId()); + + } else if (obj instanceof Throwable) { + String s = ": While reading Ack from receiver " + + ((Throwable)obj).getMessage(); + throw new ServerOperationException(s, (Throwable)obj); + } + break; + default: + throw new InternalGemFireError( + LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0 + .toLocalizedString(Integer.valueOf(msg.getMessageType()))); + } + } finally { + msg.clear(); + } + return ack; + } + + @Override + protected boolean isErrorResponse(int msgType) { + return false; + } + @Override + protected long startAttempt(ConnectionStats stats) { + return stats.startGatewayBatch(); + } + @Override + protected void endSendAttempt(ConnectionStats stats, long start) { + stats.endGatewayBatchSend(start, hasFailed()); + } + @Override + protected void endAttempt(ConnectionStats stats, long start) { + stats.endGatewayBatch(start, hasTimedOut(), hasFailed()); + } + + @Override + public boolean isGatewaySenderOp() { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java new file mode 100644 index 0000000..d2030b3 --- /dev/null +++ b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java @@ -0,0 +1,34 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.client.internal; + +import java.util.List; + +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.distributed.internal.ServerLocation; + +/** + * Used to send operations from a sender to a receiver. + * @author skumar + * @since 8.1 + */ +public class SenderProxy extends ServerProxy{ + public SenderProxy(InternalPool pool) { + super(pool); + } + + public void dispatchBatch_NewWAN(Connection con, List events, int batchId, boolean removeFromQueueOnException, boolean isRetry) + { + GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, removeFromQueueOnException, isRetry); + } + + public Object receiveAckFromReceiver(Connection con) + { + return GatewaySenderBatchOp.executeOn(con, this.pool); + } +}