http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/security/ClientPostAuthorizationDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/security/ClientPostAuthorizationDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/ClientPostAuthorizationDUnitTest.java
new file mode 100644
index 0000000..91df070
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/ClientPostAuthorizationDUnitTest.java
@@ -0,0 +1,390 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.security;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+import security.AuthzCredentialGenerator;
+import security.CredentialGenerator;
+import com.gemstone.gemfire.cache.operations.OperationContext.OperationCode;
+import com.gemstone.gemfire.internal.AvailablePort;
+
+import dunit.Host;
+
+/**
+ * Tests for authorization from client to server. This tests for authorization
+ * with post-process callbacks in case return values of operations and for
+ * notifications along-with failover.
+ * 
+ * @author sumedh
+ * @since 5.5
+ */
+public class ClientPostAuthorizationDUnitTest extends
+    ClientAuthorizationTestBase {
+
+
+  /** constructor */
+  public ClientPostAuthorizationDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+
+    super.setUp();
+    final Host host = Host.getHost(0);
+    server1 = host.getVM(0);
+    server2 = host.getVM(1);
+    client1 = host.getVM(2);
+    client2 = host.getVM(3);
+
+    server1.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] { serverExpectedExceptions });
+    server2.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] { serverExpectedExceptions });
+    client2.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] { clientExpectedExceptions });
+    SecurityTestUtil.registerExpectedExceptions(clientExpectedExceptions);
+  }
+
+  // Region: Tests
+
+  public void testAllPostOps() {
+
+    OperationWithAction[] allOps = {
+        // Test CREATE and verify with a GET
+        new OperationWithAction(OperationCode.PUT),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.CHECK_NOKEY, 4),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.CHECK_NOKEY
+            | OpFlags.CHECK_NOTAUTHZ, 4),
+
+        // OPBLOCK_END indicates end of an operation block that needs to
+        // be executed on each server when doing failover
+        OperationWithAction.OPBLOCK_END,
+
+        // Test UPDATE and verify with a GET
+        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN
+            | OpFlags.USE_NEWVAL, 4),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.USE_NEWVAL, 4),
+        new OperationWithAction(OperationCode.GET, 3,
+            OpFlags.USE_OLDCONN | OpFlags.CHECK_NOKEY | OpFlags.USE_NEWVAL
+                | OpFlags.CHECK_NOTAUTHZ, 4),
+
+        OperationWithAction.OPBLOCK_END,
+
+        // Test UPDATE and verify with a KEY_SET
+        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN, 6),
+        new OperationWithAction(OperationCode.KEY_SET, 2, OpFlags.NONE, 6),
+        new OperationWithAction(OperationCode.KEY_SET, 3,
+            OpFlags.CHECK_NOTAUTHZ, 6),
+
+        OperationWithAction.OPBLOCK_END,
+
+        // Test UPDATE and verify with a QUERY
+        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN
+            | OpFlags.USE_NEWVAL, 7),
+        new OperationWithAction(OperationCode.QUERY, 2, OpFlags.USE_NEWVAL, 7),
+        new OperationWithAction(OperationCode.QUERY, 3, OpFlags.USE_NEWVAL
+            | OpFlags.CHECK_NOTAUTHZ, 7),
+
+        OperationWithAction.OPBLOCK_END,
+
+        // Test UPDATE and verify with a EXECUTE_CQ initial results
+        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN, 8),
+        new OperationWithAction(OperationCode.EXECUTE_CQ, 2, OpFlags.NONE, 8),
+        new OperationWithAction(OperationCode.EXECUTE_CQ, 3,
+            OpFlags.CHECK_NOTAUTHZ, 8),
+
+        OperationWithAction.OPBLOCK_END };
+
+    Iterator iter = getDummyGeneratorCombos().iterator();
+    while (iter.hasNext()) {
+      AuthzCredentialGenerator gen = (AuthzCredentialGenerator)iter.next();
+      CredentialGenerator cGen = gen.getCredentialGenerator();
+      Properties extraAuthProps = cGen.getSystemProperties();
+      Properties javaProps = cGen.getJavaProperties();
+      Properties extraAuthzProps = gen.getSystemProperties();
+      String authenticator = cGen.getAuthenticator();
+      String authInit = cGen.getAuthInit();
+      String accessor = gen.getAuthorizationCallback();
+      TestAuthzCredentialGenerator tgen = new 
TestAuthzCredentialGenerator(gen);
+
+      getLogWriter().info("testAllPostOps: Using authinit: " + authInit);
+      getLogWriter().info(
+          "testAllPostOps: Using authenticator: " + authenticator);
+      getLogWriter().info("testAllPostOps: Using accessor: " + accessor);
+
+      // Start servers with all required properties
+      Properties serverProps = buildProperties(authenticator, accessor, true,
+          extraAuthProps, extraAuthzProps);
+      // Get ports for the servers
+      Integer port1 = new Integer(AvailablePort
+          .getRandomAvailablePort(AvailablePort.SOCKET));
+      Integer port2 = new Integer(AvailablePort
+          .getRandomAvailablePort(AvailablePort.SOCKET));
+
+      // Close down any running servers
+      server1.invoke(SecurityTestUtil.class, "closeCache");
+      server2.invoke(SecurityTestUtil.class, "closeCache");
+
+      // Perform all the ops on the clients
+      List opBlock = new ArrayList();
+      Random rnd = new Random();
+      for (int opNum = 0; opNum < allOps.length; ++opNum) {
+        // Start client with valid credentials as specified in
+        // OperationWithAction
+        OperationWithAction currentOp = allOps[opNum];
+        if (currentOp.equals(OperationWithAction.OPBLOCK_END)
+            || currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) {
+          // End of current operation block; execute all the operations
+          // on the servers with failover
+          if (opBlock.size() > 0) {
+            // Start the first server and execute the operation block
+            server1.invoke(ClientAuthorizationTestBase.class,
+                "createCacheServer", new Object[] {
+                    SecurityTestUtil.getLocatorPort(), port1, serverProps,
+                    javaProps });
+            server2.invoke(SecurityTestUtil.class, "closeCache");
+            executeOpBlock(opBlock, port1, port2, authInit, extraAuthProps,
+                extraAuthzProps, tgen, rnd);
+            if (!currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) {
+              // Failover to the second server and run the block again
+              server2.invoke(ClientAuthorizationTestBase.class,
+                  "createCacheServer", new Object[] {
+                      SecurityTestUtil.getLocatorPort(), port2, serverProps,
+                      javaProps });
+              server1.invoke(SecurityTestUtil.class, "closeCache");
+              executeOpBlock(opBlock, port1, port2, authInit, extraAuthProps,
+                  extraAuthzProps, tgen, rnd);
+            }
+            opBlock.clear();
+          }
+        }
+        else {
+          currentOp.setOpNum(opNum);
+          opBlock.add(currentOp);
+        }
+      }
+    }
+  }
+
+  public void testAllOpsNotifications() {
+
+    OperationWithAction[] allOps = {
+        // Test CREATE and verify with a GET
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 2, OpFlags.USE_REGEX
+                | OpFlags.REGISTER_POLICY_NONE, 8),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 3, OpFlags.USE_REGEX
+                | OpFlags.REGISTER_POLICY_NONE | OpFlags.USE_NOTAUTHZ, 8),
+        new OperationWithAction(OperationCode.PUT),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP, 4),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.CHECK_FAIL, 4),
+
+        // OPBLOCK_END indicates end of an operation block that needs to
+        // be executed on each server when doing failover
+        OperationWithAction.OPBLOCK_END,
+
+        // Test UPDATE and verify with a GET
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 2, OpFlags.USE_REGEX
+                | OpFlags.REGISTER_POLICY_NONE, 8),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 3, OpFlags.USE_REGEX
+                | OpFlags.REGISTER_POLICY_NONE | OpFlags.USE_NOTAUTHZ, 8),
+        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN
+            | OpFlags.USE_NEWVAL, 4),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.USE_NEWVAL, 4),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.USE_NEWVAL | OpFlags.CHECK_FAIL, 4),
+
+        OperationWithAction.OPBLOCK_END,
+
+        // Test DESTROY and verify with GET that keys should not exist
+        new OperationWithAction(OperationCode.PUT, 3, OpFlags.NONE, 8),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 2, OpFlags.USE_REGEX, 8),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST, 3,
+            OpFlags.USE_REGEX | OpFlags.USE_OLDCONN | 
OpFlags.REGISTER_POLICY_NONE, 8),
+        // registerInterest now clears the keys, so a dummy put to add
+        // those keys back for the case when updates should not come
+        new OperationWithAction(OperationCode.PUT, 3, OpFlags.USE_OLDCONN, 8),
+        new OperationWithAction(OperationCode.DESTROY, 1, OpFlags.USE_OLDCONN,
+            4),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.CHECK_FAIL, 4),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP, 4),
+        // Repopulate the region
+        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN, 8),
+
+        OperationWithAction.OPBLOCK_END,
+
+        // Do REGION_CLEAR and check with GET
+        new OperationWithAction(OperationCode.PUT, 3, OpFlags.NONE, 8),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 2, OpFlags.USE_ALL_KEYS, 1),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST, 3,
+            OpFlags.USE_ALL_KEYS | OpFlags.USE_OLDCONN | 
OpFlags.REGISTER_POLICY_NONE, 1),
+        // registerInterest now clears the keys, so a dummy put to add
+        // those keys back for the case when updates should not come
+        new OperationWithAction(OperationCode.PUT, 3, OpFlags.USE_OLDCONN, 8),
+        new OperationWithAction(OperationCode.REGION_CLEAR, 1,
+            OpFlags.USE_OLDCONN, 1),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.CHECK_FAIL, 8),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP, 8),
+        // Repopulate the region
+        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN, 8),
+
+        OperationWithAction.OPBLOCK_END,
+
+        // Do REGION_CREATE and check with CREATE/GET
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 2, OpFlags.USE_ALL_KEYS | OpFlags.ENABLE_DRF, 
1),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 3, OpFlags.USE_ALL_KEYS | OpFlags.ENABLE_DRF
+                | OpFlags.USE_NOTAUTHZ | OpFlags.REGISTER_POLICY_NONE, 1),
+        new OperationWithAction(OperationCode.REGION_CREATE, 1,
+            OpFlags.ENABLE_DRF, 1),
+        new OperationWithAction(OperationCode.PUT, 1, OpFlags.USE_OLDCONN
+            | OpFlags.USE_SUBREGION, 4),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.USE_SUBREGION
+            | OpFlags.NO_CREATE_SUBREGION, 4),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.USE_SUBREGION
+            | OpFlags.NO_CREATE_SUBREGION | OpFlags.CHECK_NOREGION, 4),
+
+        // Do REGION_DESTROY of the sub-region and check with GET
+        new OperationWithAction(OperationCode.REGION_DESTROY, 1,
+            OpFlags.USE_OLDCONN | OpFlags.USE_SUBREGION, 1),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.USE_SUBREGION
+            | OpFlags.NO_CREATE_SUBREGION | OpFlags.CHECK_NOREGION, 4),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.USE_SUBREGION | OpFlags.CHECK_NOKEY
+            | OpFlags.CHECK_EXCEPTION, 4),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.USE_SUBREGION
+            | OpFlags.NO_CREATE_SUBREGION | OpFlags.CHECK_NOREGION, 4),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN
+            | OpFlags.USE_SUBREGION | OpFlags.CHECK_NOKEY
+            | OpFlags.CHECK_EXCEPTION, 4),
+
+        OperationWithAction.OPBLOCK_END,
+
+        // Do REGION_DESTROY of the region and check with GET
+        new OperationWithAction(OperationCode.PUT, 3, OpFlags.NONE, 8),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST,
+            OperationCode.GET, 2, OpFlags.USE_ALL_KEYS, 1),
+        new OperationWithAction(OperationCode.REGISTER_INTEREST, 3,
+            OpFlags.USE_ALL_KEYS | OpFlags.USE_OLDCONN | 
OpFlags.REGISTER_POLICY_NONE, 1),
+        // registerInterest now clears the keys, so a dummy put to add
+        // those keys back for the case when updates should not come
+        new OperationWithAction(OperationCode.PUT, 3, OpFlags.USE_OLDCONN, 8),
+        new OperationWithAction(OperationCode.REGION_DESTROY, 1, OpFlags.NONE,
+            1),
+        new OperationWithAction(OperationCode.GET, 2, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP | OpFlags.CHECK_NOREGION, 4),
+        new OperationWithAction(OperationCode.GET, 3, OpFlags.USE_OLDCONN
+            | OpFlags.LOCAL_OP, 4),
+
+        OperationWithAction.OPBLOCK_NO_FAILOVER };
+
+      AuthzCredentialGenerator gen = getXmlAuthzGenerator();
+      getLogWriter().info("Executing opblocks with credential generator " + 
gen);
+      CredentialGenerator cGen = gen.getCredentialGenerator();
+      Properties extraAuthProps = cGen.getSystemProperties();
+      Properties javaProps = cGen.getJavaProperties();
+      Properties extraAuthzProps = gen.getSystemProperties();
+      String authenticator = cGen.getAuthenticator();
+      String authInit = cGen.getAuthInit();
+      String accessor = gen.getAuthorizationCallback();
+      TestAuthzCredentialGenerator tgen = new 
TestAuthzCredentialGenerator(gen);
+
+      getLogWriter().info(
+          "testAllOpsNotifications: Using authinit: " + authInit);
+      getLogWriter().info(
+          "testAllOpsNotifications: Using authenticator: " + authenticator);
+      getLogWriter().info(
+          "testAllOpsNotifications: Using accessor: " + accessor);
+
+      // Start servers with all required properties
+      Properties serverProps = buildProperties(authenticator, accessor, true,
+          extraAuthProps, extraAuthzProps);
+      // Get ports for the servers
+      Integer port1 = new Integer(AvailablePort
+          .getRandomAvailablePort(AvailablePort.SOCKET));
+      Integer port2 = new Integer(AvailablePort
+          .getRandomAvailablePort(AvailablePort.SOCKET));
+
+      // Perform all the ops on the clients
+      List opBlock = new ArrayList();
+      Random rnd = new Random();
+      for (int opNum = 0; opNum < allOps.length; ++opNum) {
+        // Start client with valid credentials as specified in
+        // OperationWithAction
+        OperationWithAction currentOp = allOps[opNum];
+        if (currentOp.equals(OperationWithAction.OPBLOCK_END)
+            || currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) {
+          // End of current operation block; execute all the operations
+          // on the servers with failover
+          if (opBlock.size() > 0) {
+            // Start the first server and execute the operation block
+            server1.invoke(ClientAuthorizationTestBase.class,
+                "createCacheServer", new Object[] {
+                    SecurityTestUtil.getLocatorPort(), port1, serverProps,
+                    javaProps });
+            server2.invoke(SecurityTestUtil.class, "closeCache");
+            executeOpBlock(opBlock, port1, port2, authInit, extraAuthProps,
+                extraAuthzProps, tgen, rnd);
+            if (!currentOp.equals(OperationWithAction.OPBLOCK_NO_FAILOVER)) {
+              // Failover to the second server and run the block again
+              server2.invoke(ClientAuthorizationTestBase.class,
+                  "createCacheServer", new Object[] {
+                      SecurityTestUtil.getLocatorPort(), port2, serverProps,
+                      javaProps });
+              server1.invoke(SecurityTestUtil.class, "closeCache");
+              executeOpBlock(opBlock, port1, port2, authInit, extraAuthProps,
+                  extraAuthzProps, tgen, rnd);
+            }
+            opBlock.clear();
+          }
+        }
+        else {
+          currentOp.setOpNum(opNum);
+          opBlock.add(currentOp);
+        }
+      }
+  }
+
+  // End Region: Tests
+
+  public void tearDown2() throws Exception {
+
+    super.tearDown2();
+    // close the clients first
+    client1.invoke(SecurityTestUtil.class, "closeCache");
+    client2.invoke(SecurityTestUtil.class, "closeCache");
+    SecurityTestUtil.closeCache();
+    // then close the servers
+    server1.invoke(SecurityTestUtil.class, "closeCache");
+    server2.invoke(SecurityTestUtil.class, "closeCache");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserAPIDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserAPIDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserAPIDUnitTest.java
new file mode 100644
index 0000000..9c5d18a
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserAPIDUnitTest.java
@@ -0,0 +1,382 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.security;
+
+import hydra.Log;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLHandshakeException;
+
+import security.CredentialGenerator;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.client.Pool;
+import com.gemstone.gemfire.cache.execute.FunctionService;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.Query;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.PoolManagerImpl;
+
+import dunit.Host;
+import dunit.VM;
+import security.DummyCredentialGenerator;
+
+public class MultiuserAPIDUnitTest extends ClientAuthorizationTestBase {
+
+  /** constructor */
+  public MultiuserAPIDUnitTest(String name) {
+    super(name);
+  }
+
+  private VM server1 = null;
+
+  private VM server2 = null;
+
+  private VM client1 = null;
+
+  private VM client2 = null;
+
+  private static final String[] serverExpectedExceptions = {
+      AuthenticationRequiredException.class.getName(),
+      AuthenticationFailedException.class.getName(),
+      GemFireSecurityException.class.getName(),
+      ClassNotFoundException.class.getName(), IOException.class.getName(),
+      SSLException.class.getName(), SSLHandshakeException.class.getName()};
+
+  private static final String[] clientExpectedExceptions = {
+      AuthenticationRequiredException.class.getName(),
+      AuthenticationFailedException.class.getName(),
+      SSLHandshakeException.class.getName()};
+
+  public void setUp() throws Exception {
+    super.setUp();
+    final Host host = Host.getHost(0);
+    server1 = host.getVM(0);
+    server2 = host.getVM(1);
+    client1 = host.getVM(2);
+    client2 = host.getVM(3);
+
+    server1.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] {serverExpectedExceptions});
+    server2.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] {serverExpectedExceptions});
+    client1.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] {clientExpectedExceptions});
+    client2.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] {clientExpectedExceptions});
+  }
+
+  public static Integer createCacheServer(Object dsPort, Object locatorString,
+      Object authenticator, Object extraProps, Object javaProps) {
+
+    Properties authProps;
+    if (extraProps == null) {
+      authProps = new Properties();
+    } else {
+      authProps = (Properties)extraProps;
+    }
+    if (authenticator != null) {
+      authProps.setProperty(
+          DistributionConfig.SECURITY_CLIENT_AUTHENTICATOR_NAME, authenticator
+              .toString());
+    }
+    return SecurityTestUtil.createCacheServer(authProps, javaProps,
+        (Integer)dsPort, (String)locatorString, null, new Integer(
+            SecurityTestUtil.NO_EXCEPTION));
+  }
+
+  private static void createCacheClient(Object authInit, Properties authProps,
+      Properties javaProps, Integer[] ports, Object numConnections,
+      Boolean multiUserMode, Integer expectedResult) {
+
+    String authInitStr = (authInit == null ? null : authInit.toString());
+    SecurityTestUtil.createCacheClient(authInitStr, authProps, javaProps,
+        ports, numConnections, multiUserMode.toString(), expectedResult);
+  }
+
+  public static void createCacheClient(Object authInit, Object authProps,
+      Object javaProps, Integer port1, Integer port2, Object numConnections,
+      Boolean multiUserMode, Integer expectedResult) {
+
+    createCacheClient(authInit, (Properties)authProps, (Properties)javaProps,
+        new Integer[] {port1, port2}, numConnections, multiUserMode,
+        expectedResult);
+  }
+
+  public static void registerAllInterest() {
+    Region region = SecurityTestUtil.getCache().getRegion(
+        SecurityTestUtil.regionName);
+    assertNotNull(region);
+    region.registerInterestRegex(".*");
+  }
+
+  private void setUpVMs(CredentialGenerator gen, Boolean multiUser) {
+    Properties extraProps = gen.getSystemProperties();
+    Properties javaProps = gen.getJavaProperties();
+    String authenticator = gen.getAuthenticator();
+    String authInit = gen.getAuthInit();
+
+    getLogWriter().info(
+        "testValidCredentials: Using scheme: " + gen.classCode());
+    getLogWriter().info(
+        "testValidCredentials: Using authenticator: " + authenticator);
+    getLogWriter().info("testValidCredentials: Using authinit: " + authInit);
+
+    // Start the servers
+    Integer locPort1 = SecurityTestUtil.getLocatorPort();
+    Integer locPort2 = SecurityTestUtil.getLocatorPort();
+    String locString = SecurityTestUtil.getLocatorString();
+    Integer port1 = (Integer)server1.invoke(MultiuserAPIDUnitTest.class,
+        "createCacheServer", new Object[] {locPort1, locString, authenticator,
+            extraProps, javaProps});
+    Integer port2 = (Integer)server2.invoke(MultiuserAPIDUnitTest.class,
+        "createCacheServer", new Object[] {locPort2, locString, authenticator,
+            extraProps, javaProps});
+
+    // Start the clients with valid credentials
+    Properties credentials1 = gen.getValidCredentials(1);
+    Properties javaProps1 = gen.getJavaProperties();
+    getLogWriter().info(
+        "testValidCredentials: For first client credentials: " + credentials1
+            + " : " + javaProps1);
+    Properties credentials2 = gen.getValidCredentials(2);
+    Properties javaProps2 = gen.getJavaProperties();
+    getLogWriter().info(
+        "testValidCredentials: For second client credentials: " + credentials2
+            + " : " + javaProps2);
+    client1.invoke(MultiuserAPIDUnitTest.class, "createCacheClient",
+        new Object[] {authInit, credentials1, javaProps1, port1, port2, null,
+            multiUser, new Integer(SecurityTestUtil.NO_EXCEPTION)});
+  }
+
+  public void testSingleUserUnsupportedAPIs() {
+      // Start servers
+      // Start clients with multiuser-authentication set to false
+      setUpVMs(new DummyCredentialGenerator(), Boolean.FALSE);
+      client1.invoke(MultiuserAPIDUnitTest.class, "verifyDisallowedOps",
+          new Object[] {Boolean.FALSE});
+  }
+
+  public void testMultiUserUnsupportedAPIs() {
+      // Start servers.
+      // Start clients with multiuser-authentication set to true.
+      setUpVMs(new DummyCredentialGenerator(), Boolean.TRUE);
+      client1.invoke(MultiuserAPIDUnitTest.class, "verifyDisallowedOps",
+          new Object[] {Boolean.TRUE});
+  }
+
+  public static void verifyDisallowedOps(Boolean multiuserMode) {
+    String op = "unknown";
+    boolean success = false;
+    if (!multiuserMode) {
+      success = false;
+      try {
+        // Attempt cache.createAuthenticatedCacheView() and expect an 
exception, fail otherwise
+        op = "Pool.createSecureUserCache()";
+        GemFireCacheImpl.getInstance().createAuthenticatedView(new 
Properties(), "testPool");
+      } catch (IllegalStateException uoe) {
+        Log.getLogWriter().info(op + ": Got expected exception: " + uoe);
+        success = true;
+      } catch (Exception e) {
+        fail("Got unexpected exception while doing " + op, e);
+      }
+      if (!success) {
+        fail("Did not get exception while doing " + op);
+      }
+    } else { // multiuser mode
+      Region realRegion = GemFireCacheImpl.getInstance().getRegion(
+          SecurityTestUtil.regionName);
+      Region proxyRegion = SecurityTestUtil.proxyCaches[0]
+          .getRegion(SecurityTestUtil.regionName);
+      Pool pool = PoolManagerImpl.getPMI().find("testPool");
+      for (int i = 0; i <= 27; i++) {
+        success = false;
+        try {
+          switch (i) {
+            // Attempt (real) 
Region.create/put/get/containsKeyOnServer/destroy/
+            // destroyRegion/clear/remove/registerInterest/unregisterInterest()
+            // and expect an exception, fail otherwise.
+            case 0:
+              op = "Region.create()";
+              realRegion.create("key", "value");
+              break;
+            case 1:
+              op = "Region.put()";
+              realRegion.put("key", "value");
+              break;
+            case 2:
+              op = "Region.get()";
+              realRegion.get("key");
+              break;
+            case 3:
+              op = "Region.containsKeyOnServer()";
+              realRegion.containsKeyOnServer("key");
+              break;
+            case 4:
+              op = "Region.remove()";
+              realRegion.remove("key");
+              break;
+            case 5:
+              op = "Region.destroy()";
+              realRegion.destroy("key");
+              break;
+            case 6:
+              op = "Region.destroyRegion()";
+              realRegion.destroyRegion();
+              break;
+            case 7:
+              op = "Region.registerInterest()";
+              realRegion.registerInterest("key");
+              break;
+            // case 8:
+            // op = "Region.unregisterInterest()";
+            // realRegion.unregisterInterest("key");
+            // break;
+            case 8:
+              op = "Region.clear()";
+              realRegion.clear();
+              break;
+            // Attempt ProxyRegion.createSubregion/forceRolling/
+            // getAttributesMutator/registerInterest/loadSnapShot/saveSnapshot/
+            // setUserAttribute/unregisterInterest/writeToDisk
+            // and expect an exception, fail otherwise.
+            case 9:
+              op = "ProxyRegion.createSubregion()";
+              proxyRegion.createSubregion("subregion",
+                  null);
+              break;
+            case 10:
+              op = "ProxyRegion.forceRolling()";
+              proxyRegion.forceRolling();
+              break;
+            case 11:
+              op = "ProxyRegion.getAttributesMutator()";
+              proxyRegion.getAttributesMutator();
+              break;
+            case 12:
+              op = "ProxyRegion.registerInterest()";
+              proxyRegion.registerInterest("key");
+              break;
+            case 13:
+              op = "ProxyRegion.loadSnapshot()";
+              proxyRegion.loadSnapshot(null);
+              break;
+            case 14:
+              op = "ProxyRegion.saveSnapshot()";
+              proxyRegion.saveSnapshot(null);
+              break;
+            case 15:
+              op = "ProxyRegion.setUserAttribute()";
+              proxyRegion.setUserAttribute(null);
+              break;
+            case 16:
+              op = "ProxyRegion.unregisterInterestRegex()";
+              proxyRegion.unregisterInterestRegex("*");
+              break;
+            // Attempt FunctionService.onRegion/onServer/s(pool) and expect an
+            // exception, fail otherwise.
+            case 17:
+              op = "FunctionService.onRegion()";
+              FunctionService.onRegion(realRegion);
+              break;
+            case 18:
+              op = "FunctionService.onServer(pool)";
+              FunctionService.onServer(pool);
+              break;
+            case 19:
+              op = "FunctionService.onServers(pool)";
+              FunctionService.onServers(pool);
+              break;
+            // Attempt
+            // 
QueryService.newQuery().execute()/newCq().execute/executeWithInitialResults()
+            case 20:
+              op = "QueryService.newQuery.execute()";
+              Query query = pool.getQueryService().newQuery(
+                  "SELECT * FROM /" + SecurityTestUtil.regionName);
+              query.execute();
+              break;
+            case 21:
+              op = "QueryService.newCq.execute()";
+              CqQuery cqQuery = pool.getQueryService().newCq(
+                  "SELECT * FROM /" + SecurityTestUtil.regionName,
+                  new CqAttributesFactory().create());
+              try {
+                cqQuery.execute();
+              } catch (CqException ce) {
+                throw (Exception)ce.getCause();
+              }
+              break;
+            case 22:
+              op = "QueryService.newCq.executeWithInitialResults()";
+              cqQuery = pool.getQueryService().newCq(
+                  "SELECT * FROM /" + SecurityTestUtil.regionName,
+                  new CqAttributesFactory().create());
+              try {
+                cqQuery.executeWithInitialResults();
+              } catch (CqException ce) {
+                throw (Exception)ce.getCause();
+              }
+              break;
+            // Attempt ProxyQueryService.getIndex/createIndex/removeIndex() and
+            // expect an exception, fail otherwise.
+            case 23:
+              op = "ProxyQueryService().getIndexes()";
+              SecurityTestUtil.proxyCaches[0].getQueryService()
+                  .getIndexes(null);
+              break;
+            case 24:
+              op = "ProxyQueryService().createIndex()";
+              SecurityTestUtil.proxyCaches[0].getQueryService().createIndex(
+                  null, null, null );
+              break;
+            case 25:
+              op = "ProxyQueryService().removeIndexes()";
+              
SecurityTestUtil.proxyCaches[0].getQueryService().removeIndexes();
+              break;
+            case 26:
+              op = "ProxyRegion.localDestroy()";
+              proxyRegion.localDestroy("key");
+              break;
+            case 27:
+              op = "ProxyRegion.localInvalidate()";
+              proxyRegion.localInvalidate("key");
+              break;
+            default:
+              fail("Unknown op code: " + i);
+              break;
+          }
+        } catch (UnsupportedOperationException uoe) {
+          Log.getLogWriter().info(op + ": Got expected exception: " + uoe);
+          success = true;
+        } catch (Exception e) {
+          fail("Got unexpected exception while doing " + op, e);
+        }
+        if (!success) {
+          fail("Did not get exception while doing " + op);
+        }
+      }
+    }
+  }
+
+  public void tearDown2() throws Exception {
+    super.tearDown2();
+    // close the clients first
+    client1.invoke(SecurityTestUtil.class, "closeCache");
+    client2.invoke(SecurityTestUtil.class, "closeCache");
+    // then close the servers
+    server1.invoke(SecurityTestUtil.class, "closeCache");
+    server2.invoke(SecurityTestUtil.class, "closeCache");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserDurableCQAuthzDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserDurableCQAuthzDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserDurableCQAuthzDUnitTest.java
new file mode 100644
index 0000000..99c55b1
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/security/MultiuserDurableCQAuthzDUnitTest.java
@@ -0,0 +1,484 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.security;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+import security.AuthzCredentialGenerator;
+import security.CredentialGenerator;
+
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.operations.OperationContext.OperationCode;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.cq.dunit.CqQueryTestListener;
+import com.gemstone.gemfire.cache.query.internal.cq.ClientCQImpl;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.InternalLogWriter;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+
+/**
+ * @author ashetkar
+ *
+ */
+public class MultiuserDurableCQAuthzDUnitTest extends
+    ClientAuthorizationTestBase {
+  
+  public static final Map<String, String> cqNameToQueryStrings = new 
HashMap<String, String>();
+
+  static {
+    cqNameToQueryStrings.put("CQ_0", "SELECT * FROM ");
+    cqNameToQueryStrings.put("CQ_1", "SELECT * FROM ");
+  }
+
+  public MultiuserDurableCQAuthzDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    getSystem();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+
+    final Host host = Host.getHost(0);
+    server1 = host.getVM(0);
+    server2 = host.getVM(1);
+    client1 = host.getVM(2);
+    client2 = host.getVM(3);
+
+    server1.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] { serverExpectedExceptions });
+    server2.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] { serverExpectedExceptions });
+    client2.invoke(SecurityTestUtil.class, "registerExpectedExceptions",
+        new Object[] { clientExpectedExceptions });
+    SecurityTestUtil.registerExpectedExceptions(clientExpectedExceptions);
+  }
+
+  public void testCQForDurableClientsWithDefaultClose() throws Exception {
+    /*
+     *  1. Start a server.
+     *  2. Start a durable client in mulituser secure mode.
+     *  3. Create two users registering unique durable CQs on server.
+     *  4. Invoke GemFireCache.close() at client.
+     *  5. Put some events on server satisfying both the CQs.
+     *  6. Up the client and the two users.
+     *  7. Confirm that the users receive the events which were enqueued at 
server while they were away.
+     *  8. Same for ProxyCache.close()
+     */
+    Integer numOfUsers = 2;
+    Integer numOfPuts = 5;
+    Boolean[] postAuthzAllowed = new Boolean[] {Boolean.TRUE, Boolean.TRUE};
+
+    doTest(numOfUsers, numOfPuts, postAuthzAllowed,
+          getXmlAuthzGenerator(), null);
+  }
+
+  public void testCQForDurableClientsWithCloseKeepAliveTrue() throws Exception 
{
+    /*
+     *  1. Start a server.
+     *  2. Start a durable client in mulituser secure mode.
+     *  3. Create two users registering unique durable CQs on server.
+     *  4. Invoke GemFireCache.close(false) at client.
+     *  5. Put some events on server satisfying both the CQs.
+     *  6. Up the client and the two users.
+     *  7. Observer the behaviour.
+     *  8. Same for ProxyCache.close(false)
+     */
+    Integer numOfUsers = 2;
+    Integer numOfPuts = 5;
+    Boolean[] postAuthzAllowed = new Boolean[] {Boolean.TRUE, Boolean.TRUE};
+
+    doTest(numOfUsers, numOfPuts, postAuthzAllowed,
+          getXmlAuthzGenerator(), Boolean.TRUE);
+  }
+
+  public void testCQForDurableClientsWithCloseKeepAliveFalse() throws 
Exception {
+    /*
+     *  1. Start a server.
+     *  2. Start a durable client in mulituser secure mode.
+     *  3. Create two users registering unique durable CQs on server.
+     *  4. Invoke GemFireCache.close(true) at client.
+     *  5. Put some events on server satisfying both the CQs.
+     *  6. Up the client and the two users.
+     *  7. Observer the behaviour.
+     *  8. Same for ProxyCache.close(true)
+     */
+    Integer numOfUsers = 2;
+    Integer numOfPuts = 5;
+    Boolean[] postAuthzAllowed = new Boolean[] {Boolean.TRUE, Boolean.TRUE};
+
+    doTest(numOfUsers, numOfPuts, postAuthzAllowed,
+              getXmlAuthzGenerator(), Boolean.FALSE);
+  }
+
+  private void doTest(Integer numOfUsers, Integer numOfPuts,
+      Boolean[] postAuthzAllowed, AuthzCredentialGenerator gen, Boolean 
keepAlive)
+      throws Exception {
+    CredentialGenerator cGen = gen.getCredentialGenerator();
+    Properties extraAuthProps = cGen.getSystemProperties();
+    Properties javaProps = cGen.getJavaProperties();
+    Properties extraAuthzProps = gen.getSystemProperties();
+    String authenticator = cGen.getAuthenticator();
+    String accessor = gen.getAuthorizationCallback();
+    String authInit = cGen.getAuthInit();
+    TestAuthzCredentialGenerator tgen = new TestAuthzCredentialGenerator(gen);
+
+    Properties serverProps = buildProperties(authenticator, accessor, true,
+        extraAuthProps, extraAuthzProps);
+
+    Properties opCredentials;
+    cGen = tgen.getCredentialGenerator();
+    Properties javaProps2 = null;
+    if (cGen != null) {
+      javaProps2 = cGen.getJavaProperties();
+    }
+
+    int[] indices = new int[numOfPuts];
+    for (int index = 0; index < numOfPuts; ++index) {
+      indices[index] = index;
+    }
+
+    Random rnd = new Random();
+    Properties[] authProps = new Properties[numOfUsers];
+    String durableClientId = "multiuser_durable_client_1";
+    Properties client2Credentials = null;
+    for (int i = 0; i < numOfUsers; i++) {
+      int rand = rnd.nextInt(100) + 1;
+      if (postAuthzAllowed[i]) {
+        opCredentials = tgen.getAllowedCredentials(new OperationCode[] {
+            OperationCode.EXECUTE_CQ, OperationCode.GET}, // For callback, GET 
should be allowed
+            new String[] {regionName}, indices, rand);
+      } else {
+        opCredentials = tgen.getDisallowedCredentials(new OperationCode[] {
+            OperationCode.GET}, // For callback, GET should be disallowed
+            new String[] {regionName}, indices, rand);
+      }
+      authProps[i] = SecurityTestUtil.concatProperties(new Properties[] {
+          opCredentials, extraAuthProps, extraAuthzProps});
+
+      if (client2Credentials == null) {
+        client2Credentials = tgen.getAllowedCredentials(new OperationCode[] {
+            OperationCode.PUT},
+            new String[] {regionName}, indices, rand);
+      }
+    }
+
+    // Get ports for the servers
+    Integer port1 = new Integer(AvailablePort
+        .getRandomAvailablePort(AvailablePort.SOCKET));
+    Integer port2 = new Integer(AvailablePort
+        .getRandomAvailablePort(AvailablePort.SOCKET));
+    Integer locatorPort = new Integer(AvailablePort
+        .getRandomAvailablePort(AvailablePort.SOCKET));
+    // Close down any running servers
+    server1.invoke(SecurityTestUtil.class, "closeCache");
+    server2.invoke(SecurityTestUtil.class, "closeCache");
+
+    server1.invoke(MultiuserDurableCQAuthzDUnitTest.class,
+        "createServerCache", new Object[] {serverProps, javaProps, 
locatorPort, port1});
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class,
+        "createClientCache", new Object[] {javaProps2, authInit, authProps,
+            new Integer[] {port1, port2}, numOfUsers, durableClientId, 
postAuthzAllowed});
+
+//    client2.invoke(SecurityTestUtil.class, "createCacheClient",
+//        new Object[] {authInit, client2Credentials, javaProps2,
+//            new Integer[] {port1, port2}, null, 
SecurityTestUtil.NO_EXCEPTION});
+
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "createCQ",
+        new Object[] {numOfUsers, Boolean.TRUE});
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "executeCQ",
+        new Object[] {numOfUsers, new Boolean[] {false, false}, numOfPuts,
+            new String[numOfUsers]});
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "readyForEvents");
+
+    if (keepAlive == null) {
+      client1.invoke(SecurityTestUtil.class, "closeCache");
+    } else {
+      client1.invoke(SecurityTestUtil.class, "closeCache",
+          new Object[] {keepAlive});
+    }
+
+    server1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "doPuts",
+        new Object[] {numOfPuts, Boolean.TRUE/* put last key */});
+
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class,
+        "createClientCache", new Object[] {javaProps2, authInit, authProps,
+            new Integer[] {port1, port2}, numOfUsers, durableClientId, 
postAuthzAllowed});
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "createCQ",
+        new Object[] {numOfUsers, Boolean.TRUE});
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "executeCQ",
+        new Object[] {numOfUsers, new Boolean[] {false, false}, numOfPuts,
+            new String[numOfUsers]});
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "readyForEvents");
+
+    if (!postAuthzAllowed[0] || keepAlive == null || !keepAlive) {
+      // Don't wait as no user is authorized to receive cq events.
+      Thread.sleep(1000);
+    } else {
+      client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "waitForLastKey",
+          new Object[] {Integer.valueOf(0), Boolean.TRUE});
+    }
+    Integer numOfCreates = (keepAlive == null) ? 0
+        : (keepAlive) ? (numOfPuts + 1/* last key */) : 0;
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "checkCQListeners",
+        new Object[] {numOfUsers, postAuthzAllowed, numOfCreates, 0});
+
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "proxyCacheClose",
+        new Object[] {new Integer[] {0, 1}, keepAlive});
+
+    client1.invoke(SecurityTestUtil.class, "createProxyCache",
+        new Object[] {new Integer[] {0, 1}, authProps});
+
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "createCQ",
+        new Object[] {numOfUsers, Boolean.TRUE});
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "executeCQ",
+        new Object[] {numOfUsers, new Boolean[] {false, false}, numOfPuts,
+            new String[numOfUsers]});
+
+    server1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "doPuts",
+        new Object[] {numOfPuts, Boolean.TRUE/* put last key */});
+
+    if (!postAuthzAllowed[0] || keepAlive == null || !keepAlive) {
+      // Don't wait as no user is authorized to receive cq events.
+      Thread.sleep(1000);
+    } else {
+      client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "waitForLastKey",
+          new Object[] {Integer.valueOf(0), Boolean.FALSE});
+    }
+    Integer numOfUpdates = numOfPuts + 1;
+    client1.invoke(MultiuserDurableCQAuthzDUnitTest.class, "checkCQListeners",
+        new Object[] {numOfUsers, postAuthzAllowed, 0, numOfUpdates});
+  }
+
+  public static void createServerCache(Properties serverProps,
+      Properties javaProps, Integer locatorPort, Integer serverPort) {
+    SecurityTestUtil.createCacheServer((Properties)serverProps, javaProps,
+        locatorPort, null, serverPort, Boolean.TRUE, new Integer(
+            SecurityTestUtil.NO_EXCEPTION));
+  }
+
+  public static void createClientCache(Properties javaProps,
+      String authInit, Properties[] authProps, Integer ports[],
+      Integer numOfUsers, Boolean[] postAuthzAllowed) {
+    SecurityTestUtil.createCacheClientForMultiUserMode(numOfUsers, authInit,
+        authProps, javaProps, ports, null, Boolean.FALSE,
+        SecurityTestUtil.NO_EXCEPTION);
+  }
+
+  public static void readyForEvents() {
+    GemFireCacheImpl.getInstance().readyForEvents();
+  }
+
+  public static void createClientCache(Properties javaProps,
+      String authInit, Properties[] authProps, Integer ports[],
+      Integer numOfUsers, String durableId, Boolean[] postAuthzAllowed) {
+    SecurityTestUtil.createCacheClientForMultiUserMode(numOfUsers, authInit,
+        authProps, javaProps, ports, null, Boolean.FALSE, durableId,
+        SecurityTestUtil.NO_EXCEPTION);
+  }
+
+  public static void createCQ(Integer num) {
+    createCQ(num, false);
+  }
+
+  public static void createCQ(Integer num, Boolean isDurable) {
+    for (int i = 0; i < num; i++) {
+      QueryService cqService = 
SecurityTestUtil.proxyCaches[i].getQueryService();
+      String cqName = "CQ_" + i;
+      String queryStr = cqNameToQueryStrings.get(cqName)
+          + 
SecurityTestUtil.proxyCaches[i].getRegion(regionName).getFullPath();
+      // Create CQ Attributes.
+      CqAttributesFactory cqf = new CqAttributesFactory();
+      CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())};
+      ((CqQueryTestListener)cqListeners[0]).cqName = cqName;
+
+      cqf.initCqListeners(cqListeners);
+      CqAttributes cqa = cqf.create();
+
+      // Create CQ.
+      try {
+        CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa, isDurable);
+        assertTrue("newCq() state mismatch", cq1.getState().isStopped());
+      } catch (Exception ex) {
+        AssertionError err = new AssertionError("Failed to create CQ " + cqName
+            + " . ");
+        err.initCause(ex);
+        getLogWriter().info("CqService is :" + cqService, err);
+        throw err;
+      }
+    }
+  }
+
+  public static void executeCQ(Integer num, Boolean[] initialResults,
+      Integer expectedResultsSize, String[] expectedErr) {
+    InternalLogWriter logWriter = 
InternalDistributedSystem.getStaticInternalLogWriter();
+    for (int i = 0; i < num; i++) {
+      try {
+        if (expectedErr[i] != null) {
+          logWriter.info(
+              "<ExpectedException action=add>" + expectedErr[i]
+                  + "</ExpectedException>");
+        }
+        CqQuery cq1 = null;
+        String cqName = "CQ_" + i;
+        String queryStr = cqNameToQueryStrings.get(cqName)
+            + SecurityTestUtil.proxyCaches[i].getRegion(regionName)
+                .getFullPath();
+        QueryService cqService = SecurityTestUtil.proxyCaches[i]
+            .getQueryService();
+
+        // Get CqQuery object.
+        try {
+          cq1 = cqService.getCq(cqName);
+          if (cq1 == null) {
+            getLogWriter().info(
+                "Failed to get CqQuery object for CQ name: " + cqName);
+            fail("Failed to get CQ " + cqName);
+          } else {
+            getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName());
+            assertTrue("newCq() state mismatch", cq1.getState().isStopped());
+          }
+        } catch (Exception ex) {
+          getLogWriter().info("CqService is :" + cqService);
+          getLogWriter().error(ex);
+          AssertionError err = new AssertionError("Failed to execute CQ "
+              + cqName);
+          err.initCause(ex);
+          throw err;
+        }
+
+        if (initialResults[i]) {
+          SelectResults cqResults = null;
+
+          try {
+            cqResults = cq1.executeWithInitialResults();
+          } catch (Exception ex) {
+            getLogWriter().info("CqService is: " + cqService);
+            ex.printStackTrace();
+            AssertionError err = new AssertionError("Failed to execute CQ "
+                + cqName);
+            err.initCause(ex);
+            throw err;
+          }
+          getLogWriter().info("initial result size = " + cqResults.size());
+          assertTrue("executeWithInitialResults() state mismatch", cq1
+              .getState().isRunning());
+          if (expectedResultsSize >= 0) {
+            assertEquals("unexpected results size", expectedResultsSize
+                .intValue(), cqResults.size());
+          }
+        } else {
+          try {
+            cq1.execute();
+          } catch (Exception ex) {
+            AssertionError err = new AssertionError("Failed to execute CQ "
+                + cqName);
+            err.initCause(ex);
+            if (expectedErr == null) {
+              getLogWriter().info("CqService is: " + cqService, err);
+            }
+            throw err;
+          }
+          assertTrue("execute() state mismatch", cq1.getState().isRunning());
+        }
+      } finally {
+        if (expectedErr[i] != null) {
+          logWriter.info(
+              "<ExpectedException action=remove>" + expectedErr[i]
+                  + "</ExpectedException>");
+        }
+      }
+    }
+  }
+
+  public static void doPuts(Integer num, Boolean putLastKey) {
+    Region region = GemFireCacheImpl.getInstance().getRegion(regionName);
+    for (int i = 0; i < num; i++) {
+      region.put("CQ_key"+i, "CQ_value"+i);
+    }
+    if (putLastKey) {
+      region.put("LAST_KEY", "LAST_KEY");
+    }
+  }
+
+  public static void putLastKey() {
+    Region region = GemFireCacheImpl.getInstance().getRegion(regionName);
+    region.put("LAST_KEY", "LAST_KEY");
+  }
+
+  public static void waitForLastKey(Integer cqIndex, Boolean isCreate) {
+    String cqName = "CQ_" + cqIndex;
+    QueryService qService = 
SecurityTestUtil.proxyCaches[cqIndex].getQueryService();
+    ClientCQImpl cqQuery = (ClientCQImpl)qService.getCq(cqName);
+    if (isCreate) {
+      ((CqQueryTestListener)cqQuery.getCqListeners()[cqIndex])
+          .waitForCreated("LAST_KEY");
+    } else {
+      ((CqQueryTestListener)cqQuery.getCqListeners()[cqIndex])
+          .waitForUpdated("LAST_KEY");
+    }
+  }
+
+  public static void checkCQListeners(Integer numOfUsers,
+      Boolean[] expectedListenerInvocation, Integer createEventsSize,
+      Integer updateEventsSize) {
+    for (int i = 0; i < numOfUsers; i++) {
+      String cqName = "CQ_" + i;
+      QueryService qService = 
SecurityTestUtil.proxyCaches[i].getQueryService();
+      ClientCQImpl cqQuery = (ClientCQImpl)qService.getCq(cqName);
+      if (expectedListenerInvocation[i]) {
+        for (CqListener listener : cqQuery.getCqListeners()) {
+          assertEquals(createEventsSize.intValue(),
+              ((CqQueryTestListener)listener).getCreateEventCount());
+          assertEquals(updateEventsSize.intValue(),
+              ((CqQueryTestListener)listener).getUpdateEventCount());
+        }
+      } else {
+        for (CqListener listener : cqQuery.getCqListeners()) {
+          assertEquals(0, 
((CqQueryTestListener)listener).getTotalEventCount());
+        }
+      }
+    }
+  }
+
+  public static void proxyCacheClose(Integer[] userIndices) {
+    proxyCacheClose(userIndices, null);
+  }
+
+  public static void proxyCacheClose(Integer[] userIndices, Boolean 
keepAliveFlags) {
+    if (keepAliveFlags != null) {
+      for (int i : userIndices) {
+        SecurityTestUtil.proxyCaches[i].close(keepAliveFlags);
+      }
+    } else {
+      for (int i : userIndices) {
+        SecurityTestUtil.proxyCaches[i].close();
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/.keepme
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/resources/.keepme 
b/gemfire-cq/src/test/resources/.keepme
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/excludedClasses.txt
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/excludedClasses.txt
 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/excludedClasses.txt
new file mode 100644
index 0000000..c55df1d
--- /dev/null
+++ 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/excludedClasses.txt
@@ -0,0 +1,2 @@
+# e.g.,
+#com/gemstone/gemfire/cache/query/CqAttributesFactory

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/openBugs.txt
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/openBugs.txt 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/openBugs.txt
new file mode 100644
index 0000000..6c40e24
--- /dev/null
+++ 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/openBugs.txt
@@ -0,0 +1,21 @@
+# This is a list of classes excluded due to open bugs about their having
+# incompatible changes.  There should be no entries in this file at the
+# time of a product release.
+
+# Each entry should be a bug number followed by a comma and the
+# full class name.  The package components can be delimited with a period
+# or a comma.  Don't include ".class" or ".java" at the end of the name.
+
+# example:  
50174,com/gemstone/gemfire/distributed/internal/StartupResponseWithVersionMessage
+# example:  50175,com.gemstone.org.jgroups.Message$Header
+
+
+# ~~~~~~~~~~~~~~~~~~~ DataSerializables ~~~~~~~~~~~~~~~~~~~~~~~~~
+# these are failures from testDataSerializables
+
+
+# ~~~~~~~~~~~~~~~~~~~ Serializables ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+# these are failures from testSerializables
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
new file mode 100644
index 0000000..eb9d8d6
--- /dev/null
+++ 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedDataSerializables.txt
@@ -0,0 +1,4 @@
+com/gemstone/gemfire/cache/query/internal/cq/ServerCQImpl,2
+fromData,64,2ab40046594dc22ab400462bb80099b6009ab6007f2cc3a700084e2cc32dbf2a2bb8009bb6009cb5009d2a2bb8009eb5005d2a2bb9009f0100b800a0b50008b1
+toData,47,2ab40046b60047b800a12bb800a22ab4009db8007c2bb800a32ab4005d2bb800a42b2ab40008b600a5b900a60300b1
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt
 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt
new file mode 100755
index 0000000..ddd9718
--- /dev/null
+++ 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/codeAnalysis/sanctionedSerializables.txt
@@ -0,0 +1 @@
+com/gemstone/gemfire/cache/query/internal/cq/CqConflatable,true,-7215022132135862557,conflate:boolean,id:com/gemstone/gemfire/internal/cache/EventID,key:java/lang/Object,regionname:java/lang/String,value:java/lang/Object

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-client-cache.xml
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-client-cache.xml
 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-client-cache.xml
new file mode 100644
index 0000000..afde78f
--- /dev/null
+++ 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-client-cache.xml
@@ -0,0 +1,21 @@
+<?xml version="1.0"?>
+ <!DOCTYPE client-cache PUBLIC
+    "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.6//EN"
+    "http://www.gemstone.com/dtd/cache6_6.dtd";>
+    
+
+<client-cache>
+    <pool 
+    name="client" 
+    subscription-enabled="true" 
+    load-conditioning-interval="60000"
+    read-timeout="30000" retry-attempts="5" >
+        <server host="localhost" port="10188" />
+    </pool>
+
+    <region name="testReadyForEventsNotCalledImplicitlyWithCacheXML_region" >
+    <region-attributes 
id="testReadyForEventsNotCalledImplicitlyWithCacheXML_region" 
statistics-enabled="true"  pool-name="client" refid="PROXY">
+        
<cache-listener><class-name>com.gemstone.gemfire.internal.cache.tier.sockets.CacheServerTestUtil$ControlListener</class-name></cache-listener>
+    </region-attributes>
+    </region>
+</client-cache>

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-server-cache.xml
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-server-cache.xml
 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-server-cache.xml
new file mode 100644
index 0000000..2d58506
--- /dev/null
+++ 
b/gemfire-cq/src/test/resources/com/gemstone/gemfire/internal/cache/tier/sockets/durablecq-server-cache.xml
@@ -0,0 +1,16 @@
+<?xml version="1.0"?>
+<!DOCTYPE cache PUBLIC
+    "-//GemStone Systems, Inc.//GemFire Declarative Caching 6.6//EN"
+    "http://www.gemstone.com/dtd/cache6_6.dtd";>
+
+<cache is-server="true">
+    <!-- Define this cache server -->
+    <cache-server port="10188" >
+        <client-subscription eviction-policy="entry" capacity="1000"/>
+    </cache-server>
+    <region name="testReadyForEventsNotCalledImplicitlyWithCacheXML_region">
+        <region-attributes  data-policy="partition" 
enable-subscription-conflation="false">
+             <partition-attributes redundant-copies="0" 
total-num-buckets="113"/>
+        </region-attributes>
+    </region>
+</cache>

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/cq/statistics/cqStats.bt
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/resources/cq/statistics/cqStats.bt 
b/gemfire-cq/src/test/resources/cq/statistics/cqStats.bt
new file mode 100644
index 0000000..ea6685c
--- /dev/null
+++ b/gemfire-cq/src/test/resources/cq/statistics/cqStats.bt
@@ -0,0 +1,3 @@
+cq/statistics/cqStatsHCT.conf
+edgeHosts =1 edgeVMsPerHost = 1 edgeThreadsPerVM = 1
+bridgeHosts =1 bridgeVMsPerHost = 1 bridgeThreadsPerVM = 1

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/resources/cq/statistics/cqStatsHCT.conf
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/resources/cq/statistics/cqStatsHCT.conf 
b/gemfire-cq/src/test/resources/cq/statistics/cqStatsHCT.conf
new file mode 100644
index 0000000..4623a5d
--- /dev/null
+++ b/gemfire-cq/src/test/resources/cq/statistics/cqStatsHCT.conf
@@ -0,0 +1,100 @@
+hydra.Prms-testRequirement = "Test CQ with a variety of operations with 
careful validation and serial execution in a bridge configuration";
+hydra.Prms-testDescription = "
+This test executes operations on entries on a region and carefully
+validates for correctness in the CQs. 
+The test uses serial round robin; the first thread
+in the round does a random operation, then all other threads in the round
+verify their view of the operation. After the last thread in the round 
verifies,
+it then becomes the first in the next round, thus the thread doing the random
+operation changes for each round. 
+";
+
+INCLUDE $JTESTS/hydraconfig/hydraparams1.inc;
+INCLUDE $JTESTS/hydraconfig/topology_hct.inc;
+
+hydra.VmPrms-extraClassPaths =
+   fcn "hydra.TestConfigFcns.duplicate(\"$GEMFIRE/lib/antlr.jar\", 
${bridgeHosts})" ncf;
+
+THREADGROUP bridgeThreads
+  totalThreads = fcn
+                 ${bridgeHosts} * ${bridgeVMsPerHost} * ${bridgeThreadsPerVM}
+                 ncf
+  clientNames  = fcn "hydra.TestConfigFcns.generateNames
+                      (\"bridge\", ${bridgeHosts}, true)"
+                 ncf;
+THREADGROUP edgeThreads
+  totalThreads = fcn
+                 ${edgeHosts} * ${edgeVMsPerHost} * ${edgeThreadsPerVM}
+                 ncf
+  clientNames  = fcn "hydra.TestConfigFcns.generateNames
+                      (\"edge\", ${edgeHosts}, true)"
+                 ncf;         
+
+INITTASK     taskClass   = cq.statistics.CQStatisticsTest  taskMethod = 
HydraTask_initializeBridgeServer
+             threadGroups = bridgeThreads
+             runMode = always;
+
+INITTASK     taskClass   = cq.statistics.CQStatisticsTest  taskMethod = 
HydraTask_initializeClient
+             threadGroups = edgeThreads
+             runMode = always;
+
+INITTASK         taskClass   = cq.statistics.CQStatisticsTest  taskMethod = 
HydraTask_verifyCqStatistics
+             threadGroups = edgeThreads;
+
+hydra.GemFirePrms-stopSystemsAfterTest = true;
+
+hydra.Prms-totalTaskTimeSec = 1200;
+hydra.Prms-maxResultWaitSec = 600;
+hydra.Prms-serialExecution = true;
+hydra.Prms-roundRobin = true;
+
+INCLUDE $JTESTS/util/randomValues.inc;
+util.RandomValuesPrms-objectType = byte[];
+util.RandomValuesPrms-elementSize = 1000;
+util.RandomValuesPrms-borderCasePercentage = 0;
+util.ValueHolderPrms-useExtraObject = true;
+hydra.GemFirePrms-conserveSockets = ONEOF true false FOENO;
+
+hydra.CachePrms-names           = cache1;
+hydra.CachePrms-searchTimeout   = 600;
+
+hydra.Prms-useFixedRandomInMaster= true;
+hydra.RegionPrms-names          = clientRegion            serverRegion;
+hydra.RegionPrms-regionName     = testRegion              testRegion;
+hydra.RegionPrms-scope          = local                   ack;
+hydra.RegionPrms-poolName       = edgeDescript            none;
+hydra.RegionPrms-dataPolicy     = normal                  replicate;
+hydra.RegionPrms-cacheListeners = util.SilenceListener,   
hct.BridgeEventListener;
+hydra.RegionPrms-partitionName  = none                    none;
+
+cq.CQUtilPrms-numOpsPerTask = 100;
+
+util.CachePrms-useDeclarativeXmlFile = ONEOF true false FOENO;
+util.TestHelperPrms-minTaskGranularitySec = 30;
+
+cq.CQUtilPrms-serverEntryOperations = ONEOF add add update invalidate get 
destroy FOENO; 
+cq.CQUtilPrms-clientEntryOperations = ONEOF add add update get destroy FOENO; 
+cq.CQUtilPrms-upperThreshold = 500;
+cq.CQUtilPrms-upperThresholdServerOperations = ONEOF destroy FOENO;
+cq.CQUtilPrms-upperThresholdClientOperations = ONEOF destroy FOENO;
+cq.CQUtilPrms-lowerThreshold = 0;
+cq.CQUtilPrms-lowerThresholdServerOperations = add;
+cq.CQUtilPrms-lowerThresholdClientOperations = add;
+cq.CQUtilPrms-queryDepth = 7;
+cq.CQUtilPrms-numQueriesPerClientVM = 100; 
+cq.CQUtilPrms-QueryServicePoolName = qservice;
+cq.CQUtilPrms-QueryServiceUsingPool = ONEOF true false FOENO;
+
+// define the edge clients
+hydra.PoolPrms-names                       = edgeDescript qservice;
+hydra.PoolPrms-minConnections        = 2;
+hydra.PoolPrms-subscriptionEnabled = true;
+hydra.PoolPrms-threadLocalConnections         = true;
+hydra.PoolPrms-readTimeout                 = 800000;
+hydra.PoolPrms-subscriptionRedundancy             = RANGE 0
+                                                   fcn ${bridgeHosts} * 
${bridgeVMsPerHost} ncf
+                                             EGNAR;
+
+// define the bridge servers
+hydra.BridgePrms-names                = bridge;
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/build.gradle
----------------------------------------------------------------------
diff --git a/gemfire-wan/build.gradle b/gemfire-wan/build.gradle
new file mode 100644
index 0000000..eba2539
--- /dev/null
+++ b/gemfire-wan/build.gradle
@@ -0,0 +1,5 @@
+dependencies {
+  provided project(':gemfire-core')
+  provided project(path: ':gemfire-core', configuration: 'testOutput')
+  provided project(path: ':gemfire-junit', configuration: 'testOutput')
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/.keepme
----------------------------------------------------------------------
diff --git a/gemfire-wan/src/main/java/.keepme 
b/gemfire-wan/src/main/java/.keepme
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
 
b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
new file mode 100644
index 0000000..15be615
--- /dev/null
+++ 
b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/GatewaySenderBatchOp.java
@@ -0,0 +1,304 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014, Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import com.gemstone.gemfire.InternalGemFireError;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.cache.EventID;
+import com.gemstone.gemfire.internal.cache.tier.MessageType;
+import com.gemstone.gemfire.internal.cache.tier.sockets.ChunkedMessage;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
+import com.gemstone.gemfire.internal.cache.tier.sockets.Part;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
+import 
com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher;
+import 
com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventRemoteDispatcher.GatewayAck;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.logging.log4j.Logger;
+
+public class GatewaySenderBatchOp {
+  
+  private static final Logger logger = LogService.getLogger();
+  
+  /**
+   * Send a list of gateway events to a server to execute
+   * using connections from the given pool
+   * to communicate with the server.
+   * @param con the connection to send the message on.
+   * @param pool the pool to use to communicate with the server.
+   * @param events list of gateway events
+   * @param batchId the ID of this batch
+   * @param removeFromQueueOnException true if the events should be processed 
even after some exception
+   */
+  public static void executeOn(Connection con, ExecutablePool pool, List 
events, int batchId, boolean removeFromQueueOnException, boolean isRetry)
+  {
+    AbstractOp op = null;
+    //System.out.println("Version: "+con.getWanSiteVersion());
+    if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) {
+      op = new GatewaySenderGFEBatchOpImpl(events, batchId, 
removeFromQueueOnException, con.getDistributedSystemId(), isRetry);
+    } else {
+      // Default should create a batch of server version (ACCEPTOR.VERSION)
+      op = new GatewaySenderGFEBatchOpImpl(events, batchId, 
removeFromQueueOnException, con.getDistributedSystemId(), isRetry);     
+    }
+    pool.executeOn(con, op, true/*timeoutFatal*/);
+  }
+  
+  
+  public static Object executeOn(Connection con, ExecutablePool pool)
+  {
+    AbstractOp op = null;
+    //System.out.println("Version: "+con.getWanSiteVersion());
+    // [sumedh] both cases are now same; why switch-case?
+    if (Version.GFE_651.compareTo(con.getWanSiteVersion()) >= 0) {
+      op = new GatewaySenderGFEBatchOpImpl();
+    } else {
+      // Default should create a batch of server version (ACCEPTOR.VERSION)
+      op = new GatewaySenderGFEBatchOpImpl();
+    }
+    return pool.executeOn(con, op, true/*timeoutFatal*/);
+  }
+                                                               
+  private GatewaySenderBatchOp() {
+    // no instances allowed
+  }
+  
+  static class GatewaySenderGFEBatchOpImpl extends AbstractOp {
+    
+    /**
+     * @throws com.gemstone.gemfire.SerializationException if serialization 
fails
+     */
+    public GatewaySenderGFEBatchOpImpl(List events, int batchId, boolean 
removeFromQueueOnException, int dsId, boolean isRetry)  {
+      super(MessageType.GATEWAY_RECEIVER_COMMAND, calcPartCount(events));
+      if (isRetry) {
+        getMessage().setIsRetry();
+      }
+      getMessage().addIntPart(events.size());
+      getMessage().addIntPart(batchId);
+      getMessage().addIntPart(dsId);
+      getMessage().addBytesPart(
+          new byte[] { removeFromQueueOnException ? (byte)1 : (byte)0 });
+      // Add each event
+      for (Iterator i = events.iterator(); i.hasNext();) {
+        GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next();
+        // Add action
+        int action = event.getAction();
+        getMessage().addIntPart(action);
+        { // Add posDup flag
+          byte posDupByte = (byte)(event.getPossibleDuplicate()?0x01:0x00);
+          getMessage().addBytesPart(new byte[] {posDupByte});
+        }
+        if (action >= 0 && action <= 3) {
+          // 0 = create
+          // 1 = update
+          // 2 = destroy
+          String regionName = event.getRegionPath();
+          EventID eventId = event.getEventId();
+          Object key = event.getKey();
+          Object callbackArg = event.getSenderCallbackArgument();
+
+          // Add region name
+          getMessage().addStringPart(regionName);
+          // Add event id
+          getMessage().addObjPart(eventId);
+          // Add key
+          getMessage().addStringOrObjPart(key);
+          if (action < 2 /* it is 0 or 1 */) {
+            byte[] value = event.getSerializedValue();
+            byte valueIsObject = event.getValueIsObject();;
+            // Add value (which is already a serialized byte[])
+            getMessage().addRawPart(value, (valueIsObject == 0x01));
+          }
+          // Add callback arg if necessary
+          if (callbackArg == null) {
+            getMessage().addBytesPart(new byte[] {0x00});
+          } else {
+            getMessage().addBytesPart(new byte[] {0x01});
+            getMessage().addObjPart(callbackArg);
+          }
+          getMessage().addLongPart(event.getVersionTimeStamp());
+        }
+      }
+    }
+
+    public GatewaySenderGFEBatchOpImpl() {
+      super(MessageType.GATEWAY_RECEIVER_COMMAND, 0);
+    }
+
+    @Override
+    public Object attempt(Connection cnx) throws Exception {
+      if (getMessage().getNumberOfParts() == 0) {
+        return attemptRead(cnx);
+      }
+      this.failed = true;
+      this.timedOut = false;
+      long start = startAttempt(cnx.getStats());
+      try {
+        try {
+          attemptSend(cnx);
+          this.failed = false;
+        } finally {
+          endSendAttempt(cnx.getStats(), start);
+        }
+      } finally {
+        endAttempt(cnx.getStats(), start);
+      }
+      return this.failed;
+    }
+    
+    private Object attemptRead(Connection cnx) throws Exception {
+      this.failed = true;
+      try {
+        Object result = attemptReadResponse(cnx);
+        this.failed = false;
+        return result;
+      } catch (SocketTimeoutException ste) {
+        this.failed = false;
+        this.timedOut = true;
+        throw ste;
+      } catch (Exception e) {
+        throw e;
+      }
+    }
+    
+    
+    /**
+     * Attempts to read a response to this operation by reading it from the
+     * given connection, and returning it.
+     * @param cnx the connection to read the response from
+     * @return the result of the operation
+     *         or <code>null</code> if the operation has no result.
+     * @throws Exception if the execute failed
+     */
+    protected Object attemptReadResponse(Connection cnx) throws Exception {
+      Message msg = createResponseMessage();
+      if (msg != null) {
+        msg.setComms(cnx.getSocket(), cnx.getInputStream(),
+            cnx.getOutputStream(),
+            ((ConnectionImpl)cnx).getCommBufferForAsyncRead(), cnx.getStats());
+        if (msg instanceof ChunkedMessage) {
+          try {
+            return processResponse(msg, cnx);
+          } finally {
+            msg.unsetComms();
+            // TODO (ashetkar) Handle the case when we fail to read the
+            // connection id.
+            processSecureBytes(cnx, msg);
+          }
+        }
+
+        try {
+          msg.recv();
+        } finally {
+          msg.unsetComms();
+          processSecureBytes(cnx, msg);
+        }
+        return processResponse(msg, cnx);
+      }
+      
+      return null;
+    }
+    
+    
+    private static int calcPartCount(List events) {
+      int numberOfParts = 4; // for the number of events and the batchId
+      for (Iterator i = events.iterator(); i.hasNext();) {
+        GatewaySenderEventImpl event = (GatewaySenderEventImpl)i.next();
+        numberOfParts += event.getNumberOfParts();
+      }
+      return numberOfParts;
+    }
+
+    @Override
+    protected void processSecureBytes(Connection cnx, Message message)
+        throws Exception {
+    }
+
+    @Override
+    protected boolean needsUserId() {
+      return false;
+    }
+
+    @Override
+    protected void sendMessage(Connection cnx) throws Exception {
+      getMessage().setEarlyAck((byte)(getMessage().getEarlyAckByte() & 
Message.MESSAGE_HAS_SECURE_PART));
+      getMessage().send(false);
+    }
+
+    @Override
+    protected Object processResponse(Message msg) throws Exception {
+      GatewayAck ack = null;
+      try {
+        // Read the header which describes the type of message following
+        switch (msg.getMessageType()) {
+        case MessageType.REPLY:
+          // Read the chunk
+          int batchId = msg.getPart(0).getInt();
+          int numEvents = msg.getPart(1).getInt();
+          ack = new GatewayAck(batchId, numEvents);
+          break;
+        case MessageType.EXCEPTION:
+          Part part0 = msg.getPart(0);
+
+          Object obj = part0.getObject();
+          if (obj instanceof List) {
+            List<BatchException70> l = 
(List<BatchException70>)part0.getObject();
+
+            if (logger.isDebugEnabled()) {
+              logger.debug("We got an exception from the GatewayReceiver. 
MessageType : {} obj :{}", msg.getMessageType(), obj);
+            }
+            // don't throw Exception but set it in the Ack
+            BatchException70 be = new BatchException70(l);
+            ack = new GatewayAck(be, l.get(0).getBatchId());
+
+          } else if (obj instanceof Throwable) {
+            String s = ": While reading Ack from receiver "
+                + ((Throwable)obj).getMessage();
+            throw new ServerOperationException(s, (Throwable)obj);
+          }
+          break;
+        default:
+          throw new InternalGemFireError(
+              LocalizedStrings.Op_UNKNOWN_MESSAGE_TYPE_0
+                  .toLocalizedString(Integer.valueOf(msg.getMessageType())));
+        }
+      } finally {
+        msg.clear();
+      }
+      return ack;
+    }
+    
+    @Override
+    protected boolean isErrorResponse(int msgType) {
+      return false;
+    }
+    @Override
+    protected long startAttempt(ConnectionStats stats) {
+      return stats.startGatewayBatch();
+    }
+    @Override
+    protected void endSendAttempt(ConnectionStats stats, long start) {
+      stats.endGatewayBatchSend(start, hasFailed());
+    }
+    @Override
+    protected void endAttempt(ConnectionStats stats, long start) {
+      stats.endGatewayBatch(start, hasTimedOut(), hasFailed());
+    }
+    
+    @Override
+    public boolean isGatewaySenderOp() {
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
 
b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
new file mode 100644
index 0000000..d2030b3
--- /dev/null
+++ 
b/gemfire-wan/src/main/java/com/gemstone/gemfire/cache/client/internal/SenderProxy.java
@@ -0,0 +1,34 @@
+/*=========================================================================
+ * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.client.internal;
+
+import java.util.List;
+
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+
+/**
+ * Used to send operations from a sender to a receiver.
+ * @author skumar
+ * @since 8.1
+ */
+public class SenderProxy extends ServerProxy{
+  public SenderProxy(InternalPool pool) {
+    super(pool);
+  }
+
+  public void dispatchBatch_NewWAN(Connection con, List events, int batchId, 
boolean removeFromQueueOnException, boolean isRetry)
+  {
+    GatewaySenderBatchOp.executeOn(con, this.pool, events, batchId, 
removeFromQueueOnException, isRetry);
+  }
+  
+  public Object receiveAckFromReceiver(Connection con)
+  {
+    return GatewaySenderBatchOp.executeOn(con, this.pool);
+  }
+}

Reply via email to