http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolOptimizedExecuteDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolOptimizedExecuteDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolOptimizedExecuteDUnitTest.java
new file mode 100644
index 0000000..c58e02c
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolOptimizedExecuteDUnitTest.java
@@ -0,0 +1,42 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+
+import dunit.SerializableRunnable;
+
+/**
+ * Test class for testing {@link CqServiceImpl#EXECUTE_QUERY_DURING_INIT} flag
+ *
+ */
+public class CqQueryUsingPoolOptimizedExecuteDUnitTest extends 
CqQueryUsingPoolDUnitTest{
+
+  public CqQueryUsingPoolOptimizedExecuteDUnitTest(String name) {
+    super(name);
+   }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false;
+      }
+    });
+  }
+  
+  @Override
+  public void tearDown2() throws Exception {
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true;
+      }
+    });
+    super.tearDown2();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolDUnitTest.java
new file mode 100644
index 0000000..e7f3e0e
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolDUnitTest.java
@@ -0,0 +1,1131 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.cache.query.internal.cq.ServerCQImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.index.IndexManager;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * This class tests the ContiunousQuery mechanism in GemFire.
+ *
+ * @author anil
+ */
+public class CqResultSetUsingPoolDUnitTest extends CacheTestCase {
+
+  protected CqQueryUsingPoolDUnitTest cqDUnitTest = new 
CqQueryUsingPoolDUnitTest("CqResultSetUsingPoolDUnitTest");
+  
+  private final String selStr = "SELECT * FROM /root/regionA";
+  
+  /** Supported queries */
+  public final String[] condition = new String [] {
+    /* 0  */ " p WHERE p.ID > 3",
+    /* 1  */ " p WHERE p.ID < 3",
+    /* 2  */ "   WHERE   ID = 3",
+    /* 3  */ " p WHERE p.ID >= 3",
+    /* 4  */ " p WHERE p.ID <= 3",
+    /* 5  */ " p WHERE p.ID > 3 AND p.status = 'active'",
+    /* 6  */ "   WHERE   status = 'active' AND ID < 3",
+    /* 7  */ " p WHERE p.names[0] = 'aaa'",
+    /* 8  */ " p WHERE p.status LIKE 'active'",
+    /* 9  */ " p WHERE p.collectionHolderMap.get('1').arr[0] = '0'",
+    /* 10 */ " p WHERE p.position1.portfolioId > 3",
+    /* 11 */ " p where p.position3[1].portfolioId = 2",
+    /* 12 */ " p where NOT(SELECT DISTINCT * FROM positions.values pos " +
+             " WHERE pos.secId in SET('YHOO', 'SUN', 'IBM', 'YHOO', 'GOOG', " +
+             " 'MSFT', 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 
'HP')).isEmpty",
+    /* 13  */ " p WHERE p.ID != 3 AND p.ID !=4",
+    /* 14  */ " p WHERE (p.ID = 2 OR p.ID = 4) AND p.status = 'active'",
+
+  };
+
+  /** For intial size of 5 */
+  public final int[] resultSize = new int [] {
+    /* 0  */ 2,
+    /* 1  */ 2,
+    /* 2  */ 1,
+    /* 3  */ 3,
+    /* 4  */ 3,
+    /* 5  */ 1,
+    /* 6  */ 1,
+    /* 7  */ 5,
+    /* 8  */ 2,
+    /* 9  */ 5,
+    /* 10 */ 2,
+    /* 11 */ 5,
+    /* 12 */ 5,
+    /* 13 */ 3,
+    /* 14 */ 2,
+  };
+
+  /** For intial size of 5 */
+  public final String[][] expectedKeys = new String [][] {
+    /* 0  */ {"key-4", "key-5"},
+    /* 1  */ {"key-1", "key-2"},
+    /* 2  */ {"key-3"},
+    /* 3  */ {"key-3", "key-4", "key-5"},
+    /* 4  */ {"key-1", "key-2", "key-3"},
+    /* 5  */ {"key-4"},
+    /* 6  */ {"key-2"},
+    /* 7  */ {"key-1", "key-2", "key-3","key-4", "key-5"},
+    /* 8  */ {"key-2", "key-4"},
+    /* 9  */ {"key-1", "key-2", "key-3","key-4", "key-5"},
+    /* 10 */ {"key-4", "key-5"},
+    /* 11 */ {"key-1", "key-2", "key-3","key-4", "key-5"},
+    /* 12 */ {"key-1", "key-2", "key-3","key-4", "key-5"},
+    /* 13 */ {"key-1", "key-2", "key-5"},
+    /* 14  */ {"key-2", "key-4"},
+  };
+
+
+  
+  /** Unsupported queries */
+  public final String[] condition2 = new String [] {
+    /* 0  */ " p1, /root/regionB p2 WHERE p1.status = p2.status",
+    /* 1  */ " p, p.positions.values p1 WHERE p1.secId = 'IBM'",
+    /* 2  */ " p, p.positions.values AS pos WHERE pos.secId != '1'",
+    /* 3  */ " p WHERE p.ID in (SELECT p1.ID FROM /root/regionA p1 WHERE p1.ID 
> 3)",
+    /* 4  */ ".entries entry WHERE entry.key = '1'",
+    /* 5  */ ".entries entry WHERE entry.value.ID > '3'",
+    /* 6  */ ".values p WHERE p.ID > '3' and p.status = 'active'",
+    /* 7  */ " p, p.position3 pos where pos.portfolioId  = 1",
+  };
+  
+
+  public CqResultSetUsingPoolDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    // avoid IllegalStateException from HandShake by connecting all vms tor
+    // system before creating ConnectionPools
+    getSystem();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+    
+  }
+  
+  
+  /**
+   * Tests CQ Result Set.
+   * 
+   * @throws Exception
+   */
+  public void testCqResults() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server);
+
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    String poolName = "testCqResults";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // Put 5 entries into the region.
+    cqDUnitTest.createValues(server, "regionA", 5);
+    
+    // Test for supported queries.
+    String cqQuery = "";
+    for (int queryCnt=0; queryCnt < condition.length; queryCnt++) {
+      cqQuery = selStr + condition[queryCnt];
+      cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, 
cqQuery);
+      cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, 
resultSize[queryCnt], expectedKeys[queryCnt], null);
+    }
+        
+    // Test unsupported queries.
+    for (int queryCnt=0; queryCnt < condition2.length; queryCnt++) {
+      cqQuery = selStr + condition2[queryCnt];
+      
+      try {
+        cqDUnitTest.createCQ(client, poolName, "testCqResultsF_" + queryCnt, 
cqQuery);
+        //cqDUnitTest.executeCQ(client, "testCqResultsF_" + queryCnt, true, 
cqDUnitTest.noTest, null);
+        fail("UnSupported CQ Query, Expected to fail. " +
+            " CQ :" + "testCqResultsF_" + queryCnt +
+            " Query : " + cqQuery);
+      } catch (Exception ex) {
+        // Expected.
+      }
+    }
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server); 
+  }
+
+
+  /**
+   * Tests CQ Result Set with Compact Range Index.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsWithCompactRangeIndex() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server);
+
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    String poolName = "testCqResults";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // Create index.
+    cqDUnitTest.createFunctionalIndex(server, "IdIndex", "p.ID", 
"/root/regionA p");
+    cqDUnitTest.createFunctionalIndex(server, "statusIndex", "p.status", 
"/root/regionA p");
+    cqDUnitTest.createFunctionalIndex(server, "portfolioIdIndex", 
"p.position1.portfolioId", "/root/regionA p");
+    
+    // Put 5 entries into the region.
+    cqDUnitTest.createValues(server, "regionA", 5);
+    
+    // Test for supported queries.
+    String cqQuery = "";
+    for (int queryCnt=0; queryCnt < condition.length; queryCnt++) {
+      cqQuery = selStr + condition[queryCnt];
+      cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, 
cqQuery);
+      cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, 
resultSize[queryCnt], expectedKeys[queryCnt], null);
+    }
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server); 
+  }
+
+  /**
+   * Tests CQ Result Set with Range Index.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsWithRangeIndex() throws Exception
+  {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server);
+
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    String poolName = "testCqResults";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // Create index.
+    server.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
+      public void run2() throws CacheException {
+        IndexManager.TEST_RANGEINDEX_ONLY = true;
+      }
+    });
+
+    cqDUnitTest.createFunctionalIndex(server, "IdIndex", "p.ID", 
"/root/regionA p");
+    cqDUnitTest.createFunctionalIndex(server, "statusIndex", "p.status", 
"/root/regionA p");
+    cqDUnitTest.createFunctionalIndex(server, "portfolioIdIndex", 
"p.position1.portfolioId", "/root/regionA p");
+    
+    // Put 5 entries into the region.
+    cqDUnitTest.createValues(server, "regionA", 5);
+    
+    // Test for supported queries.
+    String cqQuery = "";
+    for (int queryCnt=0; queryCnt < condition.length; queryCnt++) {
+      cqQuery = selStr + condition[queryCnt];
+      cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, 
cqQuery);
+      cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, 
resultSize[queryCnt], expectedKeys[queryCnt], null);
+    }
+
+    // Unset the flag.
+    server.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
+      public void run2() throws CacheException {
+        IndexManager.TEST_RANGEINDEX_ONLY = false;
+      }
+    });
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server); 
+  }
+
+  /**
+   * Tests CQ Result Set.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsOnPR() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+
+    cqDUnitTest.createServerWithPR(server1, 0, false, 0);
+    cqDUnitTest.createServerWithPR(server2, 0, false, 0);
+    
+    final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+
+    String poolName = "testCqResults";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // Put 5 entries into the region.
+    cqDUnitTest.createValues(server1, "regionA", 5);
+    
+    // Test for supported queries.
+    String cqQuery = "";
+    for (int queryCnt=0; queryCnt < condition.length; queryCnt++) {
+      cqQuery = selStr + condition[queryCnt];
+      cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, 
cqQuery);
+      cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, 
resultSize[queryCnt], expectedKeys[queryCnt], null);
+    }
+        
+    // Test unsupported queries.
+    for (int queryCnt=0; queryCnt < condition2.length; queryCnt++) {
+      cqQuery = selStr + condition2[queryCnt];
+      
+      try {
+        cqDUnitTest.createCQ(client, poolName, "testCqResultsF_" + queryCnt, 
cqQuery);
+        //cqDUnitTest.executeCQ(client, "testCqResultsF_" + queryCnt, true, 
cqDUnitTest.noTest, null);
+        fail("UnSupported CQ Query, Expected to fail. " +
+            " CQ :" + "testCqResultsF_" + queryCnt +
+            " Query : " + cqQuery);
+      } catch (Exception ex) {
+        // Expected.
+      }
+    }
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+    cqDUnitTest.closeServer(server2);
+  }
+
+  /**
+   * Tests CQ Result Set with Compact Range Index.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsWithCompactRangeIndexOnPR() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+
+    cqDUnitTest.createServerWithPR(server1, 0, false, 0);
+    cqDUnitTest.createServerWithPR(server2, 0, false, 0);
+
+    final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+
+    String poolName = "testCqResults";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // Create index.
+    cqDUnitTest.createFunctionalIndex(server1, "IdIndex", "p.ID", 
"/root/regionA p");
+    cqDUnitTest.createFunctionalIndex(server1, "statusIndex", "p.status", 
"/root/regionA p");
+    cqDUnitTest.createFunctionalIndex(server1, "portfolioIdIndex", 
"p.position1.portfolioId", "/root/regionA p");
+    
+    // Put 5 entries into the region.
+    cqDUnitTest.createValues(server1, "regionA", 5);
+    
+    // Test for supported queries.
+    String cqQuery = "";
+    for (int queryCnt=0; queryCnt < condition.length; queryCnt++) {
+      cqQuery = selStr + condition[queryCnt];
+      cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, 
cqQuery);
+      cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, 
resultSize[queryCnt], expectedKeys[queryCnt], null);
+    }
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1); 
+    cqDUnitTest.closeServer(server2); 
+
+  }
+
+  /**
+   * Tests CQ Result Set with Range Index.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsWithRangeIndexOnPR() throws Exception
+  {
+    
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+
+    cqDUnitTest.createServerWithPR(server1, 0, false, 0);
+    cqDUnitTest.createServerWithPR(server2, 0, false, 0);
+
+    final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+
+    String poolName = "testCqResults";
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // Create index.
+    server1.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
+      public void run2() throws CacheException {
+        IndexManager.TEST_RANGEINDEX_ONLY = true;
+      }
+    });
+
+    server2.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
+      public void run2() throws CacheException {
+        IndexManager.TEST_RANGEINDEX_ONLY = true;
+      }
+    });
+
+    cqDUnitTest.createFunctionalIndex(server1, "IdIndex", "p.ID", 
"/root/regionA p");
+    cqDUnitTest.createFunctionalIndex(server1, "statusIndex", "p.status", 
"/root/regionA p");
+    cqDUnitTest.createFunctionalIndex(server1, "portfolioIdIndex", 
"p.position1.portfolioId", "/root/regionA p");
+    
+    // Put 5 entries into the region.
+    cqDUnitTest.createValues(server1, "regionA", 5);
+    
+    // Test for supported queries.
+    String cqQuery = "";
+    for (int queryCnt=0; queryCnt < condition.length; queryCnt++) {
+      cqQuery = selStr + condition[queryCnt];
+      cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, 
cqQuery);
+      cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, 
resultSize[queryCnt], expectedKeys[queryCnt], null);
+    }
+        
+    // Create index.
+    server1.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
+      public void run2() throws CacheException {
+        IndexManager.TEST_RANGEINDEX_ONLY = false;
+      }
+    });
+
+    server2.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") {
+      public void run2() throws CacheException {
+        IndexManager.TEST_RANGEINDEX_ONLY = false;
+      }
+    });
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1); 
+    cqDUnitTest.closeServer(server2); 
+  }
+
+  /**
+   * Tests CQ Result Set.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsCaching() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+
+    cqDUnitTest.createServer(server);
+
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    String poolName = "testCqResults";
+    final String cqName = "testCqResultsP_0";
+    
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // create CQ.
+    cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
+    
+    final int numObjects = 300;
+    final int totalObjects = 500;
+    
+    // initialize Region.
+    server.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+    
+    // Keep updating region (async invocation).
+    server.invokeAsync(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        // Update (totalObjects - 1) entries.
+        for (int i = 1; i < totalObjects; i++) {
+          // Destroy entries.
+          if (i > 25 && i < 201) {
+            region.destroy(""+i);
+            continue;
+          }
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+        // recreate destroyed entries.
+        for (int j = 26; j < 201; j++) {
+          Portfolio p = new Portfolio(j);
+          region.put(""+j, p);
+        }
+        // Add the last key.
+        Portfolio p = new Portfolio(totalObjects);
+        region.put(""+totalObjects, p);
+      }
+    });
+
+    // Execute CQ.
+    // While region operation is in progress execute CQ.
+    cqDUnitTest.executeCQ(client, cqName, true, null);
+    
+    // Verify CQ Cache results.
+    server.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        
+        // Wait till all the region update is performed.
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        while(true){
+          if (region.get(""+ totalObjects) == null){
+            try {
+              Thread.sleep(50);
+            } catch (Exception ex){
+              //ignore.
+            }
+            continue;
+          }
+          break;
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          if (cqQuery.getName().equals(cqName)) {
+            int size = cqQuery.getCqResultKeysSize();
+            if (size != totalObjects) {
+              getLogWriter().info("The number of Cached events " + size + 
+                  " is not equal to the expected size " + totalObjects);
+              HashSet expectedKeys = new HashSet();
+              for (int i = 1; i < totalObjects; i++) {
+                expectedKeys.add("" + i);
+              }
+              Set cachedKeys = cqQuery.getCqResultKeyCache();
+              expectedKeys.removeAll(cachedKeys);
+              getLogWriter().info("Missing keys from the Cache : " + 
expectedKeys);
+            }
+            assertEquals("The number of keys cached for cq " + cqName + " is 
wrong.", 
+                totalObjects, cqQuery.getCqResultKeysSize());              
+          }
+        }
+      }
+    });
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server); 
+  }
+
+  /**
+   * Tests CQ Result Set.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsCachingForMultipleCQs() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client1 = host.getVM(1);
+    VM client2 = host.getVM(2);
+
+    cqDUnitTest.createServer(server);
+
+    final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    String poolName = "testCqResults";
+    final String cqName1 = "testCqResultsP_0";
+    final String cqName2 = "testCqResultsP_1";
+    
+    cqDUnitTest.createPool(client1, poolName, host0, port);
+    cqDUnitTest.createPool(client2, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client1, port, host0);
+    cqDUnitTest.createClient(client2, port, host0);
+    
+    // create CQ.
+    cqDUnitTest.createCQ(client1, poolName, cqName1, cqDUnitTest.cqs[0]);
+    cqDUnitTest.createCQ(client2, poolName, cqName2, cqDUnitTest.cqs[0]);
+    
+    final int numObjects = 300;
+    final int totalObjects = 500;
+    
+    // initialize Region.
+    server.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+    
+    // Keep updating region (async invocation).
+    server.invokeAsync(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        // Update (totalObjects - 1) entries.
+        for (int i = 1; i < totalObjects; i++) {
+          // Destroy entries.
+          if (i > 25 && i < 201) {
+            region.destroy(""+i);
+            continue;
+          }
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+        // recreate destroyed entries.
+        for (int j = 26; j < 201; j++) {
+          Portfolio p = new Portfolio(j);
+          region.put(""+j, p);
+        }
+        // Add the last key.
+        Portfolio p = new Portfolio(totalObjects);
+        region.put(""+totalObjects, p);
+      }
+    });
+
+    // Execute CQ.
+    // While region operation is in progress execute CQ.
+    cqDUnitTest.executeCQ(client1, cqName1, true, null);
+    cqDUnitTest.executeCQ(client2, cqName2, true, null);
+    
+    // Verify CQ Cache results.
+    server.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        
+        // Wait till all the region update is performed.
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        while(true){
+          if (region.get(""+ totalObjects) == null){
+            try {
+              Thread.sleep(50);
+            } catch (Exception ex){
+              //ignore.
+            }
+            continue;
+          }
+          break;
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          int size = cqQuery.getCqResultKeysSize();
+          if (size != totalObjects) {
+            getLogWriter().info("The number of Cached events " + size + 
+                " is not equal to the expected size " + totalObjects);
+            HashSet expectedKeys = new HashSet();
+            for (int i = 1; i < totalObjects; i++) {
+              expectedKeys.add("" + i);
+            }
+            Set cachedKeys = cqQuery.getCqResultKeyCache();
+            expectedKeys.removeAll(cachedKeys);
+            getLogWriter().info("Missing keys from the Cache : " + 
expectedKeys);
+          }
+          assertEquals("The number of keys cached for cq " + cqQuery.getName() 
+ " is wrong.", 
+              totalObjects, cqQuery.getCqResultKeysSize());              
+        }
+      }
+    });
+    
+    // Close.
+    cqDUnitTest.closeClient(client1);
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeServer(server); 
+  }
+
+  /**
+   * Tests CQ Result Set.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsCachingForPR() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+
+    cqDUnitTest.createServerWithPR(server1, 0, false, 0);
+    cqDUnitTest.createServerWithPR(server2, 0, false, 0);
+    
+    final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+
+    String poolName = "testCqResults";
+    final String cqName = "testCqResultsP_0";
+    
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // create CQ.
+    cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
+    
+    final int numObjects = 300;
+    final int totalObjects = 500;
+    
+    // initialize Region.
+    server1.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+    
+    // Keep updating region (async invocation).
+    server2.invokeAsync(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        // Update (totalObjects - 1) entries.
+        for (int i = 1; i < totalObjects; i++) {
+          // Destroy entries.
+          if (i > 25 && i < 201) {
+            region.destroy(""+i);
+            continue;
+          }
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+        // recreate destroyed entries.
+        for (int j = 26; j < 201; j++) {
+          Portfolio p = new Portfolio(j);
+          region.put(""+j, p);
+        }
+        // Add the last key.
+        Portfolio p = new Portfolio(totalObjects);
+        region.put(""+totalObjects, p);
+      }
+    });
+
+    // Execute CQ.
+    // While region operation is in progress execute CQ.
+    cqDUnitTest.executeCQ(client, cqName, true, null);
+    
+    // Verify CQ Cache results.
+    server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        
+        // Wait till all the region update is performed.
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        while(true){
+          if (region.get(""+ totalObjects) == null){
+            try {
+              Thread.sleep(50);
+            } catch (Exception ex){
+              //ignore.
+            }
+            continue;
+          }
+          break;
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          if (cqQuery.getCqResultKeysSize() <= 0) {
+            fail("The Result Cache for CQ on PR is not working. CQ :" + 
cqName);              
+          }
+        }
+      }
+    });
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+    cqDUnitTest.closeServer(server2);
+  }
+
+  /**
+   * Tests CQ Result Set caching for destroy events.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsCachingForDestroyEventsOnPR() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+
+    cqDUnitTest.createServerWithPR(server1, 0, false, 0);
+    cqDUnitTest.createServerWithPR(server2, 0, false, 0);
+    
+    final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+
+    String poolName = "testCqResults";
+    final String cqName = "testCqResultsCachingForDestroyEventsOnPR_0";
+    
+    cqDUnitTest.createPool(client, poolName, host0, port);
+    
+    // Create client.
+    cqDUnitTest.createClient(client, port, host0);
+
+    // create CQ.
+    cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
+
+    // Execute CQ.
+    cqDUnitTest.executeCQ(client, cqName, true, null);
+
+    final int numObjects = 50;
+    
+    // initialize Region.
+    server1.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+
+    // Update from server2.
+    server2.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+
+    // Destroy entries.
+    server2.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.destroy(""+i);
+        }
+      }
+    });
+    
+    // Wait for events to be sent.
+    cqDUnitTest.waitForDestroyed(client, cqName, "" + numObjects);
+    
+    // Verify CQ Cache results.
+    server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          if (cqQuery.getCqResultKeysSize() > 0) {
+            fail("The CQ Result Cache on PR should have been empty for CQ :" + 
cqName + " keys=" + cqQuery.getCqResultKeyCache());              
+          }
+        }
+      }
+    });
+
+    server2.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          if (cqQuery.getCqResultKeysSize() > 0) {
+            fail("The CQ Result Cache on PR should have been empty for CQ :" + 
cqName);              
+          }
+        }
+      }
+    });
+
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server1);
+    cqDUnitTest.closeServer(server2);
+  }
+
+
+  /**
+   * Tests CQ Result Caching with CQ Failover.
+   * 
+   * @throws Exception
+   */
+  public void testCqResultsCachingWithFailOver() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    cqDUnitTest.createServer(server1);
+    
+    final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
+    
+    String poolName = "testCQFailOver";
+    final String cqName = "testCQFailOver_0";
+    
+    cqDUnitTest.createPool(client, poolName, new String[] {host0, host0}, new 
int[] {port1, ports[0]});
+
+    // create CQ.
+    cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
+    
+    final int numObjects = 300;
+    final int totalObjects = 500;
+    
+    // initialize Region.
+    server1.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+    
+    // Keep updating region (async invocation).
+    server1.invokeAsync(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        // Update (totalObjects - 1) entries.
+        for (int i = 1; i < totalObjects; i++) {
+          // Destroy entries.
+          if (i > 25 && i < 201) {
+            region.destroy(""+i);
+            continue;
+          }
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+        // recreate destroyed entries.
+        for (int j = 26; j < 201; j++) {
+          Portfolio p = new Portfolio(j);
+          region.put(""+j, p);
+        }
+        // Add the last key.
+        Portfolio p = new Portfolio(totalObjects);
+        region.put(""+totalObjects, p);
+      }
+    });
+
+    // Execute CQ.
+    // While region operation is in progress execute CQ.
+    cqDUnitTest.executeCQ(client, cqName, true, null);
+    
+    // Verify CQ Cache results.
+    server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        
+        // Wait till all the region update is performed.
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        while(true){
+          if (region.get(""+ totalObjects) == null){
+            try {
+              Thread.sleep(50);
+            } catch (Exception ex){
+              //ignore.
+            }
+            continue;
+          }
+          break;
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          if (cqQuery.getName().equals(cqName)) {
+            int size = cqQuery.getCqResultKeysSize();
+            if (size != totalObjects) {
+              getLogWriter().info("The number of Cached events " + size + 
+                  " is not equal to the expected size " + totalObjects);
+              HashSet expectedKeys = new HashSet();
+              for (int i = 1; i < totalObjects; i++) {
+                expectedKeys.add("" + i);
+              }
+              Set cachedKeys = cqQuery.getCqResultKeyCache();
+              expectedKeys.removeAll(cachedKeys);
+              getLogWriter().info("Missing keys from the Cache : " + 
expectedKeys);
+            }
+            assertEquals("The number of keys cached for cq " + cqName + " is 
wrong.", 
+                totalObjects, cqQuery.getCqResultKeysSize());              
+          }
+        }
+      }
+    });
+    
+    cqDUnitTest.createServer(server2, ports[0]);
+    final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, 
"getCacheServerPort");
+    System.out.println("### Port on which server1 running : " + port1 + 
+        " Server2 running : " + thePort2);
+    pause(3 * 1000);
+    
+    // Close server1 for CQ fail over to server2.
+    cqDUnitTest.closeServer(server1); 
+    pause(3 * 1000);
+    
+    // Verify CQ Cache results.
+    server2.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqService cqService = null;
+        try {
+          cqService = 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqService.", ex);
+          fail ("Failed to get the internal CqService.", ex);
+        }
+        
+        // Wait till all the region update is performed.
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        while(true){
+          if (region.get(""+ totalObjects) == null){
+            try {
+              Thread.sleep(50);
+            } catch (Exception ex){
+              //ignore.
+            }
+            continue;
+          }
+          break;
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          if (cqQuery.getName().equals(cqName)) {
+            int size = cqQuery.getCqResultKeysSize();
+            if (size != totalObjects) {
+              getLogWriter().info("The number of Cached events " + size + 
+                  " is not equal to the expected size " + totalObjects);
+              HashSet expectedKeys = new HashSet();
+              for (int i = 1; i < totalObjects; i++) {
+                expectedKeys.add("" + i);
+              }
+              Set cachedKeys = cqQuery.getCqResultKeyCache();
+              expectedKeys.removeAll(cachedKeys);
+              getLogWriter().info("Missing keys from the Cache : " + 
expectedKeys);
+            }
+            assertEquals("The number of keys cached for cq " + cqName + " is 
wrong.", 
+                totalObjects, cqQuery.getCqResultKeysSize());              
+          }
+        }
+      }
+    });    
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server2); 
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolOptimizedExecuteDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolOptimizedExecuteDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolOptimizedExecuteDUnitTest.java
new file mode 100644
index 0000000..6d465f0
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolOptimizedExecuteDUnitTest.java
@@ -0,0 +1,223 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.cache.query.internal.cq.ServerCQImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+public class CqResultSetUsingPoolOptimizedExecuteDUnitTest extends 
CqResultSetUsingPoolDUnitTest{
+
+  public CqResultSetUsingPoolOptimizedExecuteDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false;
+      }
+    });
+  }
+  
+  @Override
+  public void tearDown2() throws Exception {
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true;
+      }
+    });
+    super.tearDown2();
+  }
+  
+  /**
+   * Tests CQ Result Caching with CQ Failover.
+   * When EXECUTE_QUERY_DURING_INIT is false and new server calls 
+   * execute during HA the results cache is not initialized.
+   * @throws Exception
+   */
+  @Override
+  public void testCqResultsCachingWithFailOver() throws Exception
+  {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    cqDUnitTest.createServer(server1);
+    
+    final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1);
+    
+    String poolName = "testCQFailOver";
+    final String cqName = "testCQFailOver_0";
+    
+    cqDUnitTest.createPool(client, poolName, new String[] {host0, host0}, new 
int[] {port1, ports[0]});
+
+    // create CQ.
+    cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]);
+    
+    final int numObjects = 300;
+    final int totalObjects = 500;
+    
+    // initialize Region.
+    server1.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+    
+    // Keep updating region (async invocation).
+    server1.invokeAsync(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        // Update (totalObjects - 1) entries.
+        for (int i = 1; i < totalObjects; i++) {
+          // Destroy entries.
+          if (i > 25 && i < 201) {
+            region.destroy(""+i);
+            continue;
+          }
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+        // recreate destroyed entries.
+        for (int j = 26; j < 201; j++) {
+          Portfolio p = new Portfolio(j);
+          region.put(""+j, p);
+        }
+        // Add the last key.
+        Portfolio p = new Portfolio(totalObjects);
+        region.put(""+totalObjects, p);
+      }
+    });
+
+    // Execute CQ.
+    // While region operation is in progress execute CQ.
+    cqDUnitTest.executeCQ(client, cqName, true, null);
+    
+    // Verify CQ Cache results.
+    server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqServiceImpl CqServiceImpl = null;
+        try {
+          CqServiceImpl = 
(com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl) 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqServiceImpl.", ex);
+          fail ("Failed to get the internal CqServiceImpl.", ex);
+        }
+        
+        // Wait till all the region update is performed.
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        while(true){
+          if (region.get(""+ totalObjects) == null){
+            try {
+              Thread.sleep(50);
+            } catch (Exception ex){
+              //ignore.
+            }
+            continue;
+          }
+          break;
+        }
+        Collection<? extends InternalCqQuery> cqs = CqServiceImpl.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          if (cqQuery.getName().equals(cqName)) {
+            int size = cqQuery.getCqResultKeysSize();
+            if (size != totalObjects) {
+              getLogWriter().info("The number of Cached events " + size + 
+                  " is not equal to the expected size " + totalObjects);
+              HashSet expectedKeys = new HashSet();
+              for (int i = 1; i < totalObjects; i++) {
+                expectedKeys.add("" + i);
+              }
+              Set cachedKeys = cqQuery.getCqResultKeyCache();
+              expectedKeys.removeAll(cachedKeys);
+              getLogWriter().info("Missing keys from the Cache : " + 
expectedKeys);
+            }
+            assertEquals("The number of keys cached for cq " + cqName + " is 
wrong.", 
+                totalObjects, cqQuery.getCqResultKeysSize());              
+          }
+        }
+      }
+    });
+    
+    cqDUnitTest.createServer(server2, ports[0]);
+    final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, 
"getCacheServerPort");
+    System.out.println("### Port on which server1 running : " + port1 + 
+        " Server2 running : " + thePort2);
+    pause(3 * 1000);
+    
+    // Close server1 for CQ fail over to server2.
+    cqDUnitTest.closeServer(server1); 
+    pause(3 * 1000);
+    
+    // Verify CQ Cache results.
+    server2.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){
+      public void run2()throws CacheException {
+        CqServiceImpl CqServiceImpl = null;
+        try {
+          CqServiceImpl = (CqServiceImpl) 
((DefaultQueryService)getCache().getQueryService()).getCqService();
+        } catch (Exception ex) {
+          getLogWriter().info("Failed to get the internal CqServiceImpl.", ex);
+          fail ("Failed to get the internal CqServiceImpl.", ex);
+        }
+        
+        // Wait till all the region update is performed.
+        Region region = getCache().getRegion("/root/" + 
cqDUnitTest.regions[0]);
+        while(true){
+          if (region.get(""+ totalObjects) == null){
+            try {
+              Thread.sleep(50);
+            } catch (Exception ex){
+              //ignore.
+            }
+            continue;
+          }
+          break;
+        }
+        Collection<? extends InternalCqQuery> cqs = CqServiceImpl.getAllCqs();
+        for (InternalCqQuery cq: cqs){
+          ServerCQImpl cqQuery = (ServerCQImpl)cq;
+          if (cqQuery.getName().equals(cqName)) {
+            int size = cqQuery.getCqResultKeysSize();
+            assertEquals("The number of keys cached for cq " + cqName + " is 
wrong.", 
+                0, size);              
+          }
+        }
+      }
+    });    
+        
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server2); 
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java
new file mode 100644
index 0000000..29ed0a4
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java
@@ -0,0 +1,119 @@
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.Properties;
+
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.dunit.HelperTestCase;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+
+import dunit.AsyncInvocation;
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.SerializableCallable;
+import dunit.VM;
+
+public class CqStateDUnitTest extends HelperTestCase {
+
+  
+  public CqStateDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void testNothingBecauseBug51953() {
+    // remove when bug #51953 is fixed
+  }
+  
+  // this test is disabled due to a 25% failure rate in
+  // CI testing.  See internal ticket #52229
+  public void disabledtestBug51222() throws Exception {
+    //The client can log this when the server shuts down.
+    addExpectedException("Could not find any server");
+    addExpectedException("java.net.ConnectException");
+    final String cqName = "theCqInQuestion";
+    final String regionName = "aattbbss";
+    final Host host = Host.getHost(0);
+    VM serverA = host.getVM(1);
+    VM serverB = host.getVM(2);
+    VM client = host.getVM(3);
+    
+    final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
+    startCacheServer(serverA, ports[0], getAuthenticatedServerProperties());
+    createReplicatedRegion(serverA, regionName, null);
+
+    final String host0 = getServerHostName(serverA.getHost());
+    startClient(client, new VM[]{ serverA, serverB }, ports, 1, 
getClientProperties());
+    createCQ(client, cqName, "select * from /"+ regionName, null);
+    
+    //create the cacheserver but regions must be present first or else cq 
execute will fail with no region found
+    createCacheServer(serverB, ports[1], getServerProperties(0));
+    createReplicatedRegion(serverB, regionName, null);
+    startCacheServers(serverB);
+    
+    AsyncInvocation async = executeCQ(client, cqName);
+    DistributedTestCase.join(async, 10000, getLogWriter());
+
+    Boolean clientRunning = (Boolean) client.invoke(new SerializableCallable() 
{
+      @Override
+      public Object call() throws Exception {
+        final CqQuery cq = getCache().getQueryService().getCq(cqName);
+        waitForCriterion(new WaitCriterion() {
+          @Override
+          public boolean done() {
+            return cq.getState().isRunning();
+          }
+          @Override
+          public String description() {
+            return "waiting for Cq to be in a running state: " + cq;
+          }
+        },  30000, 1000, false);
+        return cq.getState().isRunning();
+      }
+    });
+    assertTrue("Client was not running", clientRunning);
+    
+    //hope that server 2 comes up before num retries is exhausted by the 
execute cq command
+    //hope that the redundancy satisfier sends message and is executed after 
execute cq has been executed
+    //This is the only way bug 51222 would be noticed
+    //verify that the cq on the server is still in RUNNING state;
+    Boolean isRunning = (Boolean) serverB.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        CqQuery cq = getCache().getQueryService().getCqs()[0];
+        return cq.getState().isRunning();
+      }
+      
+    });
+    
+    assertTrue("Cq was not running on server" , isRunning);
+  }
+  
+  public Properties getAuthenticatedServerProperties() {
+    Properties props = new Properties();
+    props.put("mcast-port", "0");
+    props.put("security-client-accessor",
+        
"com.gemstone.gemfire.cache.query.dunit.CloseCacheAuthorization.create");
+    props.put("security-client-accessor-pp",
+        
"com.gemstone.gemfire.cache.query.dunit.CloseCacheAuthorization.create");
+    props.put("security-client-authenticator",
+        "templates.security.DummyAuthenticator.create");
+    return props;
+  }
+  
+  public Properties getServerProperties() {
+    Properties props = new Properties();
+    props.put("mcast-port", "0");
+    return props;
+  }
+  
+  public Properties getClientProperties() {
+    Properties props = new Properties();
+    props.put("security-client-auth-init",
+        "templates.security.UserPasswordAuthInit.create");
+    
+    props.put("security-username", "root");
+    props.put("security-password", "root");
+    return props;
+  }  
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsDUnitTest.java
new file mode 100644
index 0000000..b66228e
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsDUnitTest.java
@@ -0,0 +1,433 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import java.util.Collection;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.CqServiceStatistics;
+import com.gemstone.gemfire.cache.query.CqStatistics;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.internal.CqQueryVsdStats;
+import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
+import com.gemstone.gemfire.cache.query.internal.DefaultQueryService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceVsdStats;
+import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * This class tests the ContiunousQuery mechanism in GemFire.
+ * This includes the test with different data activities.
+ *
+ * @author Rao
+ */
+public class CqStatsDUnitTest extends CacheTestCase {
+
+  private CqQueryDUnitTest cqDUnitTest = new 
CqQueryDUnitTest("CqStatsDUnitTest");
+  
+  public CqStatsDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    // avoid IllegalStateException from HandShake by connecting all vms to
+    // system before creating pool
+    getSystem();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        getSystem();
+      }
+    });
+    
+  }
+  
+  public void validateCQStats(VM vm, final String cqName,
+      final int creates,
+      final int updates,
+      final int deletes,
+      final int totalEvents,
+      final int cqListenerInvocations) {
+    vm.invoke(new CacheSerializableRunnable("Validate CQs") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Validating CQ Stats. ### " + cqName);
+//      Get CQ Service.
+        QueryService qService = null;
+        try {          
+          qService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to get query service.");
+        }
+        
+        CqService cqService = null;
+        try {
+          cqService = ((DefaultQueryService)qService).getCqService();
+        }
+        catch (CqException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+          fail("Failed to get CqService, CQ : " + cqName);
+        }
+        Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs();
+        if (cqs.size() == 0) {
+          fail("Failed to get CqQuery for CQ : " + cqName);
+        }
+        CqQueryImpl cQuery = (CqQueryImpl) cqs.iterator().next();
+        
+        CqStatistics cqStats = cQuery.getStatistics();
+        CqQueryVsdStats cqVsdStats = cQuery.getVsdStats();
+        if (cqStats == null || cqVsdStats == null) {
+          fail("Failed to get CqQuery Stats for CQ : " + cqName);
+        }
+        
+        getCache().getLogger().info("#### CQ stats for " + cQuery.getName() + 
": " + 
+            " Events Total: " + cqStats.numEvents() +
+            " Events Created: " + cqStats.numInserts() +
+            " Events Updated: " + cqStats.numUpdates() +
+            " Events Deleted: " + cqStats.numDeletes() +
+            " CQ Listener invocations: " + 
cqVsdStats.getNumCqListenerInvocations() +
+            " Initial results time (nano sec): " + 
cqVsdStats.getCqInitialResultsTime());
+        
+        
+//      Check for totalEvents count.
+        if (totalEvents != CqQueryDUnitTest.noTest) {
+//        Result size validation.
+          assertEquals("Total Event Count mismatch", totalEvents, 
cqStats.numEvents());
+        }
+        
+//      Check for create count.
+        if (creates != CqQueryDUnitTest.noTest) {
+          assertEquals("Create Event mismatch", creates, cqStats.numInserts());
+        }
+        
+//      Check for update count.
+        if (updates != CqQueryDUnitTest.noTest) {
+          assertEquals("Update Event mismatch", updates, cqStats.numUpdates());
+        }
+        
+//      Check for delete count.
+        if (deletes != CqQueryDUnitTest.noTest) {
+          assertEquals("Delete Event mismatch", deletes, cqStats.numDeletes());
+        }
+        
+//      Check for CQ listener invocations.
+        if (cqListenerInvocations != CqQueryDUnitTest.noTest) {
+          assertEquals("CQ Listener invocations mismatch", 
cqListenerInvocations, cqVsdStats.getNumCqListenerInvocations());
+        }
+////      Check for initial results time.
+//        if (initialResultsTime != CqQueryDUnitTest.noTest && 
cqVsdStats.getCqInitialResultsTime() <= 0) {
+//          assertEquals("Initial results time mismatch", initialResultsTime, 
cqVsdStats.getCqInitialResultsTime());
+//        }
+      }
+    });
+  }
+  
+  private void validateCQServiceStats(VM vm,
+      final int created,
+      final int activated,
+      final int stopped,
+      final int closed,
+      final int cqsOnClient,
+      final int cqsOnRegion,
+      final int clientsWithCqs) {
+    vm.invoke(new CacheSerializableRunnable("Validate CQ Service Stats") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Validating CQ Service Stats. ### ");
+//      Get CQ Service.
+        QueryService qService = null;
+        try {          
+          qService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+        CqServiceStatistics cqServiceStats = null;        
+        cqServiceStats = qService.getCqStatistics();
+        CqServiceVsdStats cqServiceVsdStats = null;
+        try {
+          cqServiceVsdStats = ((CqServiceImpl) 
((DefaultQueryService)qService).getCqService()).stats;
+        }
+        catch (CqException e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+        if (cqServiceStats == null) {
+          fail("Failed to get CQ Service Stats");
+        }
+        
+        getCache().getLogger().info("#### CQ Service stats: " + 
+            " CQs created: " + cqServiceStats.numCqsCreated() +
+            " CQs active: " + cqServiceStats.numCqsActive() +
+            " CQs stopped: " + cqServiceStats.numCqsStopped() +
+            " CQs closed: " + cqServiceStats.numCqsClosed() +
+            " CQs on Client: " + cqServiceStats.numCqsOnClient() +
+            " CQs on region /root/regionA : " + 
cqServiceVsdStats.numCqsOnRegion("/root/regionA") +
+            " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs());
+        
+        
+        //        Check for created count.
+        if (created != CqQueryDUnitTest.noTest) {
+          assertEquals("Number of CQs created mismatch", created, 
cqServiceStats.numCqsCreated());
+        }
+        
+        //        Check for activated count.
+        if (activated != CqQueryDUnitTest.noTest) {
+          assertEquals("Number of CQs activated mismatch", activated, 
cqServiceStats.numCqsActive());
+        }
+        
+        //        Check for stopped count.
+        if (stopped != CqQueryDUnitTest.noTest) {
+          assertEquals("Number of CQs stopped mismatch", stopped, 
cqServiceStats.numCqsStopped());
+        }
+        
+        //        Check for closed count.
+        if (closed != CqQueryDUnitTest.noTest) {
+          assertEquals("Number of CQs closed mismatch", closed, 
cqServiceStats.numCqsClosed());
+        }
+        
+        // Check for CQs on client count.
+        if (cqsOnClient != CqQueryDUnitTest.noTest) {
+          assertEquals("Number of CQs on client mismatch", cqsOnClient, 
cqServiceStats.numCqsOnClient());
+        }
+        
+        // Check for CQs on region.
+        if (cqsOnRegion != CqQueryDUnitTest.noTest) {
+          assertEquals("Number of CQs on region /root/regionA mismatch", 
+              cqsOnRegion, cqServiceVsdStats.numCqsOnRegion("/root/regionA"));
+        }
+        
+        // Check for clients with CQs count.
+        if (clientsWithCqs != CqQueryDUnitTest.noTest) {
+          assertEquals("Clints with CQs mismatch", 
+              clientsWithCqs, cqServiceVsdStats.getNumClientsWithCqs());
+        }
+      }
+    });
+  }
+  
+  private static final int PAUSE = 8 * 1000; // 5 * 1000
+  
+  /**
+   * Test for CQ and CQ Service Statistics
+   * @throws Exception
+   */
+  public void testCQStatistics() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client = host.getVM(1);
+    
+    /* Init Server and Client */
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    cqDUnitTest.createClient(client, port, host0);
+    
+    /* Create CQs. */
+    cqDUnitTest.createCQ(client, "testCQStatistics_0", cqDUnitTest.cqs[0]);
+    
+    /* Init values at server. */
+    int size = 100;
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    
+    cqDUnitTest.executeCQ(client, "testCQStatistics_0", true, null);
+    
+    // Wait for CQ to be executed.
+    cqDUnitTest.waitForCqState(client, "testCQStatistics_0", 
CqStateImpl.RUNNING);    
+
+    // Test CQ stats
+    validateCQStats(client, "testCQStatistics_0", 0, 0, 0, 0, 0);
+    
+    // The stat would have not updated yet.
+    // Commenting out the following check; the check for resultset 
initialization 
+    // is anyway done in the next validation. 
+    // validateCQStats(server, "testCQStatistics_0", 0, 0, 0, 0, 
CqQueryDUnitTest.noTest, 1);
+    
+    /* Init values at server. */
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 200);
+    // Wait for client to Synch.
+    cqDUnitTest.waitForCreated(client, "testCQStatistics_0", 
CqQueryDUnitTest.KEY+200);
+    pause(PAUSE);
+    size = 200;
+    
+    // validate CQs.
+    cqDUnitTest.validateCQ(client, "testCQStatistics_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ 100,
+        /* updates: */ 100,
+        /* deletes; */ 0,
+        /* queryInserts: */ 100,
+        /* queryUpdates: */ 100,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ 200);
+    
+    // Test CQ stats
+    validateCQStats(client, "testCQStatistics_0", 100, 100, 0, 200, 200);
+    //We don't have serverside CQ name
+    validateCQStats(server, "testCQStatistics_0", 100, 100, 0, 200, 
CqQueryDUnitTest.noTest);
+    
+    /* Delete values at server. */
+    cqDUnitTest.deleteValues(server, cqDUnitTest.regions[0], 100);
+    // Wait for client to Synch.
+    cqDUnitTest.waitForDestroyed(client, "testCQStatistics_0", 
CqQueryDUnitTest.KEY+100);
+    size = 10;
+    pause(PAUSE);
+    
+    cqDUnitTest.validateCQ(client, "testCQStatistics_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */100,
+        /* updates: */ 100,
+        /* deletes; */ 100,
+        /* queryInserts: */ 100,
+        /* queryUpdates: */ 100,
+        /* queryDeletes: */ 100,
+        /* totalEvents: */ 300);
+    
+    // Test CQ stats
+    validateCQStats(client, "testCQStatistics_0", 100, 100, 100, 300, 300);
+    //We don't have serverside CQ name
+    validateCQStats(server, "testCQStatistics_0", 100, 100, 100, 300, 
CqQueryDUnitTest.noTest);
+    
+    // Test  CQ Close
+    cqDUnitTest.closeCQ(client, "testCQStatistics_0");
+    pause(PAUSE);
+    
+    // Close.
+    cqDUnitTest.closeClient(client);
+    cqDUnitTest.closeServer(server);
+  }
+  /**
+   * Test for CQ Service Statistics
+   * @throws Exception
+   */
+  public void testCQServiceStatistics() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client1 = host.getVM(1);
+    VM client2 = host.getVM(2);
+    
+    
+    /* Init Server and Client */
+    cqDUnitTest.createServer(server);
+    final int port = server.invokeInt(CqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    cqDUnitTest.createClient(client1, port, host0);
+    cqDUnitTest.createClient(client2, port, host0);
+    
+    /* Create CQs. */
+    String cqName = new String("testCQServiceStatistics_0");
+    String cqName10 = new String("testCQServiceStatistics_10");   
+    cqDUnitTest.createCQ(client1, cqName, cqDUnitTest.cqs[0]);
+    cqDUnitTest.createCQ(client2, cqName10, cqDUnitTest.cqs[2]); 
+    pause(PAUSE);
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on clients: #1");
+    validateCQServiceStats(client1, 1, 0, 1, 0, 1, 1, CqQueryDUnitTest.noTest);
+    validateCQServiceStats(server, 0, 0, 0, 0, CqQueryDUnitTest.noTest, 0, 0);
+    
+    cqDUnitTest.executeCQ(client1, cqName, false, null);
+    cqDUnitTest.executeCQ(client2, cqName10, false, null);
+    pause(PAUSE);
+    
+    getCache().getLogger().info("Validating CQ Service stats on clients: #2");
+    validateCQServiceStats(client1, 1, 1, 0, 0, 1, 1, CqQueryDUnitTest.noTest);
+    validateCQServiceStats(client2, 1, 1, 0, 0, 1, CqQueryDUnitTest.noTest, 
CqQueryDUnitTest.noTest);
+    
+    getCache().getLogger().info("Validating CQ Service stats on server: #1");
+    validateCQServiceStats(server, 2, 2, 0, 0, CqQueryDUnitTest.noTest, 1, 2);
+    
+    /* Init values at server. */
+    int size = 10;
+    cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size);
+    // Wait for client to Synch.
+    cqDUnitTest.waitForCreated(client1, "testCQServiceStatistics_0", 
CqQueryDUnitTest.KEY+size);
+    
+    // validate CQs.
+    cqDUnitTest.validateCQ(client1, cqName,
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    pause(PAUSE);
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on clients: #3");
+    validateCQServiceStats(client1, 1, 1, 0, 0, 1, 1, CqQueryDUnitTest.noTest);
+    validateCQServiceStats(client2, 1, 1, 0, 0, 1, CqQueryDUnitTest.noTest, 
CqQueryDUnitTest.noTest);
+    
+    getCache().getLogger().info("Validating CQ Service stats on server: #1");
+    validateCQServiceStats(server, 2, 2, 0, 0, CqQueryDUnitTest.noTest, 1, 2);
+    
+    
+    //Create CQs with no name, execute, and close. 
+    cqDUnitTest.createAndExecCQNoName(client1, cqDUnitTest.cqs[0]); 
+    pause(PAUSE);      
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on client: #4");
+    validateCQServiceStats(client1, 21, 1, 0, 20, 1, 1, 
CqQueryDUnitTest.noTest);
+    
+    getCache().getLogger().info("Validating CQ Service stats on server: #2");
+    validateCQServiceStats(server, 22, 2, 0, 20, CqQueryDUnitTest.noTest, 1, 
2);
+    
+    // Test  CQ Close
+    cqDUnitTest.closeCQ(client1, cqName);
+    pause(PAUSE);      
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on client: #5");
+    validateCQServiceStats(client1, 21, 0, 0, 21, 0, 0, 
CqQueryDUnitTest.noTest);
+    
+    getCache().getLogger().info("Validating CQ Service stats on server: #3");
+    validateCQServiceStats(server, 22, 1, 0, 21, CqQueryDUnitTest.noTest, 0, 
1);
+    
+    //Test stop CQ
+    cqDUnitTest.stopCQ(client2, cqName10);
+    pause(PAUSE);
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on client: #6");
+    validateCQServiceStats(client2, 1, 0, 1, 0, 1, CqQueryDUnitTest.noTest, 
CqQueryDUnitTest.noTest);
+    getCache().getLogger().info("Validating CQ Service stats on server: #4");
+    validateCQServiceStats(server, 22, 0, 1, 21, CqQueryDUnitTest.noTest, 
CqQueryDUnitTest.noTest, 1);
+    
+    // Test  CQ Close
+    cqDUnitTest.closeCQ(client2, cqName10);
+    pause(PAUSE);
+    
+    // Test CQ Service stats
+    getCache().getLogger().info("Validating CQ Service stats on client: #7");
+    validateCQServiceStats(client1, 21, 0, 0, 21, 0, 0, 
CqQueryDUnitTest.noTest);
+    validateCQServiceStats(client2, 1, 0, 0, 1, 0, 0, CqQueryDUnitTest.noTest);
+    getCache().getLogger().info("Validating CQ Service stats on server: #5");
+    validateCQServiceStats(server, 22, 0, 0, 22, CqQueryDUnitTest.noTest, 0, 
0);
+    
+    // Close.
+    cqDUnitTest.closeClient(client1);
+    cqDUnitTest.closeClient(client2);
+    cqDUnitTest.closeServer(server);
+  }
+
+} 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsOptimizedExecuteDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsOptimizedExecuteDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsOptimizedExecuteDUnitTest.java
new file mode 100644
index 0000000..b7eaaa2
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsOptimizedExecuteDUnitTest.java
@@ -0,0 +1,43 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import com.gemstone.gemfire.cache.query.internal.cq.CqService;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+
+import dunit.SerializableRunnable;
+
+/**
+ * Test class for testing {@link CqService#EXECUTE_QUERY_DURING_INIT} flag
+ *
+ */
+public class CqStatsOptimizedExecuteDUnitTest extends CqStatsDUnitTest{
+
+  public CqStatsOptimizedExecuteDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false;
+      }
+    });
+  }
+  
+  @Override
+  public void tearDown2() throws Exception {
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true;
+      }
+    });
+    super.tearDown2();
+  }
+}

Reply via email to