http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
new file mode 100644
index 0000000..1dbb1d7
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryDUnitTest.java
@@ -0,0 +1,1780 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import hydra.Log;
+
+import java.io.IOException;
+import java.util.HashSet;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.PartitionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+/**
+ * Test class for Partitioned Region and CQs
+ * 
+ * @author rdubey
+ * @since 5.5
+ */
+public class PartitionedRegionCqQueryDUnitTest extends CacheTestCase {
+
+  
+  public PartitionedRegionCqQueryDUnitTest(String name) {
+    super(name);
+  }
+  
+  static public final String[] regions = new String[] {
+      "regionA",
+      "regionB"
+  };
+  
+  static public final String KEY = "key-";
+  
+  protected final CqQueryDUnitTest cqHelper = new 
CqQueryDUnitTest("HelperPartitonedRegionCqTest");
+  
+  public final String[] cqs = new String [] {
+      //0 - Test for ">" 
+      "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0",
+      
+      //1 -  Test for "=" and "and".
+      "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID = 2 and 
p.status='active'",
+      
+      //2 -  Test for "<" and "and".
+      "SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and 
p.status='active'",
+      
+      // FOLLOWING CQS ARE NOT TESTED WITH VALUES; THEY ARE USED TO TEST 
PARSING LOGIC WITHIN CQ.
+      //3
+      "SELECT * FROM /root/" + regions[0] + " ;",
+      //4
+      "SELECT ALL * FROM /root/" + regions[0],
+      //5
+      "import com.gemstone.gemfire.cache.\"query\".data.Portfolio; " +
+      "SELECT ALL * FROM /root/" + regions[0] +  " TYPE Portfolio",
+      //6
+      "import com.gemstone.gemfire.cache.\"query\".data.Portfolio; " +
+      "SELECT ALL * FROM /root/" + regions[0] +  " p TYPE Portfolio",
+      //7
+      "SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and 
p.status='active';",
+      //8
+      "SELECT ALL * FROM /root/" + regions[0] + "  ;",
+      //9
+      "SELECT ALL * FROM /root/" + regions[0] +" p where p.description = NULL",
+      
+      // 10
+      "SELECT ALL * FROM /root/" + regions[1] +" p where p.ID > 0",
+  };
+  
+  public final String[] cqsWithoutRoot = new String [] {
+      //0 - Test for ">" 
+      "SELECT ALL * FROM /" + regions[0] + " p where p.ID > 0"
+      
+  };
+  
+  private static int bridgeServerPort;
+  
+  public void testCQLeakWithPartitionedRegion() throws Exception {
+    // creating servers.
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    createServer(server1);       
+    createServer(server2);    
+       
+    
+    // create client 
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    pause(2 * 1000);
+    
+    // create values
+    int size = 40;
+    createValues(server1, regions[0], size);
+    
+    // wait for last creates...
+    
+    cqHelper.waitForCreated(client, "testCQEvents_0", KEY+size);
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    
+    int cc1 = server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCqCountFromRegionProfile");
+    int cc2 = server2.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCqCountFromRegionProfile");
+    assertEquals("Should have one", 1, cc1);
+    assertEquals("Should have one", 1, cc2);
+    
+    server1.bounce();
+    
+    cqHelper.closeClient(client);
+    pause(10 * 1000);
+    //cc1 = server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCqCountFromRegionProfile");
+    cc2 = server2.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCqCountFromRegionProfile");
+    
+    //assertEquals("Should have one", 0, cc1);
+    assertEquals("Should have one", 0, cc2);
+    
+    cqHelper.closeServer(server2);
+    //cqHelper.closeServer(server1);
+  }
+  
+  public void testCQAndPartitionedRegion() throws Exception {
+    // creating servers.
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    createServer(server1);
+    
+    createServer(server2);
+    
+    // create client 
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    pause(2 * 1000);
+    
+    // create values
+    int size = 40;
+    createValues(server1, regions[0], size);
+    
+    // wait for last creates...
+    
+    cqHelper.waitForCreated(client, "testCQEvents_0", KEY+size);
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    //size = 2;
+    
+    // do updates
+    createValues(server1, regions[0], size);
+    
+    cqHelper.waitForUpdated(client, "testCQEvents_0", KEY+size);
+    
+    // validate cqs again
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    
+    // destroy all the values.
+    int numDestroys = size;
+    cqHelper.deleteValues(server2,regions[0], numDestroys);
+    
+    cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+numDestroys);
+    
+    // validate cqs after destroyes on server2.
+ 
+        
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numDestroys,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numDestroys,
+        /* totalEvents: */ (size+size+numDestroys));
+    
+    // invalidate some entries.
+    /*final int numInvalidates = 5;
+    
+    server2.invoke(new CacheSerializableRunnable("Invalidate values") {
+      public void run2() throws CacheException {
+        Region region1 = getRootRegion().getSubregion(regions[0]);
+        for (int i = numInvalidates; i <= (numInvalidates+4); i++) {
+          region1.invalidate(KEY+i);
+        }
+      }
+    });
+     
+    cqHelper.waitForInvalidated(client, "testCQEvents_0", 
KEY+(numInvalidates+4));
+    */
+   // cqHelper.validateCQ(client, "testCQEvents_0",
+    //    /* resultSize: */ cqHelper.noTest,
+    //    /* creates: */ size,
+    //    /* updates: */ size,
+    //    /* deletes; */ (numDestroys+numInvalidates),
+    //    /* queryInserts: */ size,
+     //   /* queryUpdates: */ size,
+     //   /* queryDeletes: */ (numDestroys+numInvalidates),
+    //    /* totalEvents: */ (size+size+numDestroys + numInvalidates));
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+  
+  /**
+   * test for registering cqs on a bridge server with local max memory zero.
+   */
+  public void testPartitionedCqOnAccessorBridgeServer() throws Exception {
+ // creating servers.
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    // creating an accessor vm with Bridge Server installed.
+    createServer(server1,true);
+    
+    createServer(server2);
+    
+    // create client 
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    
+    
+    // create values
+    final int size = 1000;
+    createValues(server1, regions[0], size);
+    
+    // wait for last creates...
+
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForCreated(client, "testCQEvents_0", KEY+i);
+    }
+
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    //size = 2;
+    
+    // do updates
+    createValues(server1, regions[0], size);
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForUpdated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs again.
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    
+    // destroy all the values.
+    int numDestroys = size;
+    cqHelper.deleteValues(server2,regions[0], numDestroys);
+    
+    for (int i=1; i <= numDestroys; i++){
+      cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs after destroyes on server2.
+ 
+        
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numDestroys,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numDestroys,
+        /* totalEvents: */ (size+size+numDestroys));
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+  
+  /**
+   * test for registering cqs on single Bridge server hosting all the data. 
This
+   * will generate all the events locally and should always have the old value 
+   * and should not sent the profile update on wire.
+   */
+  public void testPartitionedCqOnSingleBridgeServer() throws Exception { 
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+//    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    // creating an accessor vm with Bridge Server installed.
+    createServer(server1);
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    
+    
+    // create values
+    final int size = 400;
+    createValues(server1, regions[0], size);
+    
+    // wait for last creates...
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForCreated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    //size = 2;
+    
+    // do updates
+    createValues(server1, regions[0], size);
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForUpdated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs again.
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    
+    // destroy all the values.
+    int numDestroys = size;
+    cqHelper.deleteValues(server1,regions[0], numDestroys);
+    
+    for (int i=1; i <= numDestroys; i++){
+      cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs after destroyes on server2.
+ 
+        
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numDestroys,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numDestroys,
+        /* totalEvents: */ (size+size+numDestroys));
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server1);
+    
+  }
+  
+  /**
+   * test for registering cqs on single Bridge server hosting all the data. 
This
+   * will generate all the events locally but the puts, updates and destroys 
originate
+   * at an accessor vm.
+   */
+  public void testPRCqOnSingleBridgeServerUpdatesOriginatingAtAccessor() 
throws Exception { 
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    // creating an accessor vm with Bridge Server installed.
+    createServer(server1,true);
+    
+    assertLocalMaxMemory(server1);
+    
+    createServer(server2);
+    
+    // create client 
+    
+    final int port = 
server2.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server2.getHost());
+    
+    createClient(client, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    
+    
+    // create values
+    final int size = 400;
+    createValues(server1, regions[0], size);
+    
+    // wait for last creates...
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForCreated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    //size = 2;
+    
+    // do updates
+    createValues(server1, regions[0], size);
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForUpdated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs again.
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    
+    // destroy all the values.
+    int numDestroys = size;
+    cqHelper.deleteValues(server1,regions[0], numDestroys);
+    
+    for (int i=1; i <= numDestroys; i++){
+      cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs after destroyes on server2.
+ 
+        
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numDestroys,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numDestroys,
+        /* totalEvents: */ (size+size+numDestroys));
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+  
+  /**
+   * test to check invalidates on bridge server hosting datastores as well.
+   */
+  public void testPRCqWithInvalidatesOnBridgeServer()  {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    // creating Bridge Server with data store. clients will connect to this 
+    // bridge server.
+    createServer(server1);
+     
+    // create another server with data store.
+    createServer(server2);
+    
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    
+    
+    // create values
+    final int size = 400;
+    createValues(server1, regions[0], size);
+    
+    // wait for last creates...
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForCreated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    //size = 2;
+    
+    // do updates
+    createValues(server1, regions[0], size);
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForUpdated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs again.
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    
+    // invalidate all the values.
+    int numInvalidates = size;
+    cqHelper.invalidateValues(server2,regions[0], numInvalidates);
+    
+    for (int i=1; i <= numInvalidates; i++){
+      cqHelper.waitForInvalidated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs after invalidates on server2.
+         
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numInvalidates,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numInvalidates,
+        /* totalEvents: */ (size+size+numInvalidates));
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+  
+  /**
+   * test cqs with invalidates on bridge server not hosting datastores.
+   * 
+   */
+  public void testPRCqWithInvalidatesOnAccessorBridgeServer() throws Exception 
{
+    
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    // creating Bridge Server with data store. clients will connect to this 
+    // bridge server.
+    createServer(server1, true);
+     
+    // create another server with data store.
+    createServer(server2);
+    
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    
+    
+    // create values
+    final int size = 400;
+    createValues(server1, regions[0], size);
+    
+    // wait for last creates...
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForCreated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    //size = 2;
+    
+    // do updates
+    createValues(server1, regions[0], size);
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForUpdated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs again.
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    
+    // invalidate all the values.
+    int numInvalidates = size;
+    cqHelper.invalidateValues(server1,regions[0], numInvalidates);
+    
+    for (int i=1; i <= numInvalidates; i++){
+      cqHelper.waitForInvalidated(client, "testCQEvents_0", KEY+i);
+    } 
+    // validate cqs after invalidates on server2.
+         
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numInvalidates,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numInvalidates,
+        /* totalEvents: */ (size+size+numInvalidates));
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);
+  }
+  
+  /**
+   * test cqs with create updates and destroys from client on bridge server
+   * hosting datastores.
+   */
+  public void testPRCqWithUpdatesFromClients() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    VM client2 = host.getVM(3);
+    
+    // creating Bridge Server with data store. clients will connect to this 
+    // bridge server.
+    createServer(server1, false, 1);
+     
+    // create another server with data store.
+    createServer(server2, false , 1);
+    
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    createClient(client2, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    
+    
+    // create values
+    final int size = 400;
+    createValues(client2, regions[0], size);
+    
+    // wait for last creates...
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForCreated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    //size = 2;
+    
+    // do updates
+    createValues(client2, regions[0], size);
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForUpdated(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs again.
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    
+    // invalidate all the values.
+    int numDelets = size;
+    
+    cqHelper.deleteValues(client2,regions[0], numDelets);
+    
+    for (int i=1; i <= numDelets; i++){
+      cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+i);
+    }
+    
+    // validate cqs after invalidates on server2.
+         
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numDelets,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numDelets,
+        /* totalEvents: */ (size+size+numDelets));
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeClient(client2);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);    
+  }
+  
+  /**
+   * test cqs on multiple partitioned region hosted by bridge servers.
+   * 
+   */
+  public void testPRCqWithMultipleRegionsOnServer() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    VM client2 = host.getVM(3);
+    
+    // creating Bridge Server with data store. clients will connect to this 
+    // bridge server.
+    createServer(server1, false, 1);
+     
+    // create another server with data store.
+    createServer(server2, false , 1);
+    
+    // Wait for server to initialize.
+    pause(2000);
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    createClient(client2, port, host0);
+    
+    // register cq.
+    createCQ(client, "testCQEvents_0", cqs[0]);
+    createCQ(client, "testCQEvents_1", cqs[10]);
+    cqHelper.executeCQ(client, "testCQEvents_0", false, null);
+    cqHelper.executeCQ(client, "testCQEvents_1", false, null);
+    
+    // create values
+    final int size = 400;
+    createValues(client2, regions[0], size);
+    createValues(client2, regions[1], size);
+    
+    // wait for last creates...
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForCreated(client, "testCQEvents_0", KEY+i);
+      cqHelper.waitForCreated(client, "testCQEvents_1", KEY+i);
+    }
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    cqHelper.validateCQ(client, "testCQEvents_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    
+    //size = 2;
+    
+    // do updates
+    createValues(client2, regions[0], size);
+    createValues(client2, regions[1], size);
+    
+ 
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForUpdated(client, "testCQEvents_0", KEY+i);
+      cqHelper.waitForUpdated(client, "testCQEvents_1", KEY+i);
+    }
+    
+    // validate cqs again.
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    cqHelper.validateCQ(client, "testCQEvents_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    // invalidate all the values.
+    int numInvalidates = size;
+    //cqHelper.invalidateValues(server1,regions[0], numInvalidates);
+    cqHelper.deleteValues(client2,regions[0], numInvalidates);
+    cqHelper.deleteValues(client2,regions[1], numInvalidates);
+    
+
+    for (int i=1; i <= numInvalidates; i++){
+      cqHelper.waitForDestroyed(client, "testCQEvents_0", KEY+i);
+      cqHelper.waitForDestroyed(client, "testCQEvents_1", KEY+i);
+    }
+    
+    
+    // validate cqs after invalidates on server2.
+         
+    cqHelper.validateCQ(client, "testCQEvents_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numInvalidates,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numInvalidates,
+        /* totalEvents: */ (size+size+numInvalidates));
+    
+    
+    cqHelper.validateCQ(client, "testCQEvents_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numInvalidates,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numInvalidates,
+        /* totalEvents: */ (size+size+numInvalidates));
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeClient(client2);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);         
+  }
+  
+  /**
+   * tests multiple cqs on partitioned region on bridge servers with profile 
update 
+   * for not requiring old values.
+   * 
+   */
+  public void testPRWithCQsAndProfileUpdates() throws Exception {
+    
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    VM client2 = host.getVM(3);
+    
+    // creating Bridge Server with data store. clients will connect to this 
+    // bridge server.
+    createServer(server1, false, 1);
+     
+    // create another server with data store.
+    createServer(server2, false , 1);
+    
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    createClient(client, port, host0);
+    createClient(client2, port, host0);
+    
+    // register cq.
+    createCQ(client, "testPRWithCQsAndProfileUpdates_0", cqs[0]); // SAME CQ 
REGISTERED TWICE.
+    createCQ(client, "testPRWithCQsAndProfileUpdates_1", cqs[0]); 
+    cqHelper.executeCQ(client, "testPRWithCQsAndProfileUpdates_0", false, 
null);
+    cqHelper.executeCQ(client, "testPRWithCQsAndProfileUpdates_1", false, 
null);
+    
+    // create values
+    final int size = 400;
+    createValues(client2, regions[0], size);
+    createValues(client2, regions[1], size);
+    
+    // wait for last creates...
+
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForCreated(client,"testPRWithCQsAndProfileUpdates_0", 
KEY+i);
+      cqHelper.waitForCreated(client,"testPRWithCQsAndProfileUpdates_1", 
KEY+i);
+    }
+    
+    // validate cq..
+    cqHelper.validateCQ(client, "testPRWithCQsAndProfileUpdates_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    cqHelper.validateCQ(client, "testPRWithCQsAndProfileUpdates_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ size);
+    
+    
+    //size = 2;
+    
+    // do updates
+    createValues(client2, regions[0], size);
+    createValues(client2, regions[1], size);
+    
+    for (int i=1; i <= size; i++){
+      cqHelper.waitForUpdated(client,"testPRWithCQsAndProfileUpdates_0", 
KEY+i);
+      cqHelper.waitForUpdated(client,"testPRWithCQsAndProfileUpdates_1", 
KEY+i);
+    }
+    
+    // validate cqs again.
+    cqHelper.validateCQ(client, "testPRWithCQsAndProfileUpdates_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    cqHelper.validateCQ(client, "testPRWithCQsAndProfileUpdates_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ 0,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ (size+size));
+    
+    // invalidate all the values.
+    int numInvalidates = size;
+    //cqHelper.invalidateValues(server1,regions[0], numInvalidates);
+    cqHelper.deleteValues(client2,regions[0], numInvalidates);
+    cqHelper.deleteValues(client2,regions[1], numInvalidates);
+    
+    for (int i=1; i <= numInvalidates; i++){
+      
cqHelper.waitForDestroyed(client,"testPRWithCQsAndProfileUpdates_0",KEY+i);
+      
cqHelper.waitForDestroyed(client,"testPRWithCQsAndProfileUpdates_1",KEY+i);
+    }
+    
+    
+    // validate cqs after invalidates on server2.
+         
+    cqHelper.validateCQ(client, "testPRWithCQsAndProfileUpdates_0",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numInvalidates,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numInvalidates,
+        /* totalEvents: */ (size+size+numInvalidates));
+    
+    
+    cqHelper.validateCQ(client, "testPRWithCQsAndProfileUpdates_1",
+        /* resultSize: */ CqQueryDUnitTest.noTest,
+        /* creates: */ size,
+        /* updates: */ size,
+        /* deletes; */ numInvalidates,
+        /* queryInserts: */ size,
+        /* queryUpdates: */ size,
+        /* queryDeletes: */ numInvalidates,
+        /* totalEvents: */ (size+size+numInvalidates));
+    
+    // check for requries old value set.
+    /*
+    server1.invoke(new CacheSerializableRunnable("Check requires old value") {
+      public void run2()
+      {
+        Cache cc = getCache();
+        PartitionedRegion region1 = (PartitionedRegion)cc
+            .getRegion("/root/regionA");
+        Set requiresOldValue = region1.getRegionAdvisor()
+            .adviseRequiresOldValue();
+        getLogWriter().info(
+            "ize of requires old value at server1 before closing cqs : "
+                + requiresOldValue.size());
+        assertTrue("The size of requiresOldValue shoule be zero on server1",
+            (0 == requiresOldValue.size()));
+      }
+    });
+
+    server2.invoke(new CacheSerializableRunnable("Check requires old value") {
+      public void run2()
+      {
+        Cache cc = getCache();
+        PartitionedRegion region1 = (PartitionedRegion)cc
+            .getRegion("/root/regionA");
+        Set requiresOldValue = region1.getRegionAdvisor()
+            .adviseRequiresOldValue();
+        getLogWriter().info(
+            "size of requires old value at server2 before closing cqs :"
+                + requiresOldValue.size());
+        assertTrue("The size of requiresOldValue should be one on server2 ",
+            (1 == requiresOldValue.size()));
+      }
+    });
+    */
+    
+    cqHelper.closeCQ(client, "testPRWithCQsAndProfileUpdates_0");
+    cqHelper.closeCQ(client, "testPRWithCQsAndProfileUpdates_1");
+    
+    
+    // check for requires old value set after closing all the cqs.
+    /*
+    REQUIRES OLD VALUES requirement is removed in the eventFilterOpt_dev_Jun09 
+    branch. The old values are no more sent to the peer, instead CQs are 
processed
+    at the source (where change happens). Replace requiresOldValue test with 
+    appropriate test.
+    
+    server1.invoke(new CacheSerializableRunnable("Check requires old value") {
+      public void run2()
+      {
+        Cache cc = getCache();
+        PartitionedRegion region1 = (PartitionedRegion)cc
+            .getRegion("/root/regionA");
+        Set requiresOldValue = region1.getRegionAdvisor()
+            .adviseRequiresOldValue();
+        getLogWriter().info(
+            "size of requires old value set at the end server1 : "
+                + requiresOldValue.size());
+        assertTrue("The size of requiresOldValue shoule be zero on server1",
+            (0 == requiresOldValue.size()));
+      }
+    });
+    
+    
+    server2.invoke(new CacheSerializableRunnable("Check requires old value") {
+      public void run2()
+      {
+        Cache cc = getCache();
+        PartitionedRegion region1 = (PartitionedRegion)cc
+            .getRegion("/root/regionA");
+        Set requiresOldValue = region1.getRegionAdvisor()
+            .adviseRequiresOldValue();
+        getLogWriter().info(
+            " size of requires old value set at the end server2 : "
+                + requiresOldValue.size());
+        assertTrue(
+            "The size of requiresOldValue shoule be zero on server2 as well 
after closing all the cqs",
+            (0 == requiresOldValue.size()));
+      }
+    });
+    */
+    cqHelper.closeClient(client);
+    cqHelper.closeClient(client2);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);     
+  }
+  
+  /**
+   * Test for events created during the CQ query execution.
+   * When CQs are executed using executeWithInitialResults 
+   * there may be possibility that the region changes during
+   * that time may not be reflected in the query result set
+   * thus making the query data and region data inconsistent.
+   * @throws Exception
+   */
+  public void testEventsDuringQueryExecution() throws Exception {
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client = host.getVM(2);
+    
+    final String cqName = "testEventsDuringQueryExecution_0";
+    
+    // Server.
+    createServer(server1);
+    createServer(server2);
+    
+    final int port = 
server1.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server1.getHost());
+    
+    // Initialize Client.
+    createClient(client, port, host0);
+    
+    // create CQ.
+    createCQ(client, cqName, cqs[0]);
+    
+    
+    final int numObjects = 200;
+    final int totalObjects = 500;
+    
+    // initialize Region.
+    server1.invoke(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+      
+    // Keep updating region (async invocation).
+    server1.invokeAsync(new CacheSerializableRunnable("Update Region"){
+      public void run2()throws CacheException {
+        Region region = getCache().getRegion("/root/" + regions[0]);
+        for (int i = numObjects + 1; i <= totalObjects; i++) {
+          Portfolio p = new Portfolio(i);
+          region.put(""+i, p);
+        }
+      }
+    });
+    
+    // Execute CQ while update is in progress.
+    client.invoke(new CacheSerializableRunnable("Execute CQ") {
+      public void run2()throws CacheException {
+        QueryService cqService = getCache().getQueryService();
+        // Get CqQuery object.
+        CqQuery cq1 = cqService.getCq(cqName);
+        if (cq1 == null) {
+          fail("Failed to get CQ " + cqName);
+        }
+        
+        SelectResults cqResults = null;
+
+        try {
+          cqResults = cq1.executeWithInitialResults();
+        } catch (Exception ex){
+          AssertionError err = new AssertionError("Failed to execute  CQ " + 
cqName);
+          err.initCause(ex);
+          throw err;
+        }
+        
+        //getLogWriter().info("initial result size = " + cqResults.size());
+        
+        
+        CqQueryTestListener cqListener = 
(CqQueryTestListener)cq1.getCqAttributes().getCqListener();
+        // Wait for the last key to arrive.
+        for (int i=0; i < 4; i++) {
+          try {
+            cqListener.waitForCreated("" + totalObjects);
+            // Found skip from the loop.
+            break;
+          } catch (CacheException ex) {
+            if (i == 3) {
+              throw ex;
+            }
+          }
+        }
+        
+        // Check if the events from CqListener are in order.
+        int oldId = 0;
+        for (Object cqEvent : cqListener.events.toArray()) { 
+          int newId = new Integer(cqEvent.toString()).intValue();
+          if (oldId > newId){
+            fail("Queued events for CQ Listener during execution with " + 
+                "Initial results is not in the order in which they are 
created.");
+          }
+          oldId = newId;
+        }
+        
+        // Check if all the IDs are present as part of Select Results and CQ 
Events.
+        HashSet ids = new HashSet(cqListener.events);
+
+        for (Object o : cqResults.asList()) {
+          Struct s = (Struct)o;
+          ids.add(s.get("key"));
+        }
+
+        //Iterator iter = cqResults.asSet().iterator();
+        //while (iter.hasNext()) {
+        //  Portfolio p = (Portfolio)iter.next();
+        //  ids.add(p.getPk());
+        //  //getLogWriter().info("Result set value : " + p.getPk());
+        //}
+        
+        HashSet missingIds = new HashSet();
+        String key = "";
+        for (int i = 1; i <= totalObjects; i++) {
+          key = "" + i;
+          if (!(ids.contains(key))){
+            missingIds.add(key);
+          }
+        }
+        
+        if (!missingIds.isEmpty()) {
+          fail("Missing Keys in either ResultSet or the Cq Event list. " +
+              " Missing keys : [size : " + missingIds.size() + "]" + 
missingIds +
+              " Ids in ResultSet and CQ Events :" + ids);
+        }
+        
+      } 
+    });
+    
+    cqHelper.closeClient(client);
+    cqHelper.closeServer(server2);
+    cqHelper.closeServer(server1);    
+  }
+
+  
+  
+  public void testDestroyRegionEventOnClientsWithCQRegistered() throws 
Exception{
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client1 = host.getVM(1);
+    VM client2 = host.getVM(2);
+    
+    final String cqName = "testDestroyEventOnClientsWithCQRegistered_0";
+    
+    createServerWithoutRootRegion(server, 0, false, 0);
+    
+    final int port = server.invokeInt(PartitionedRegionCqQueryDUnitTest.class, 
"getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+    
+    // Initialize Client.
+    createCacheClient(client1, port, host0);
+    createCacheClient(client2, port, host0);
+    
+    createCQ(client1, cqName, cqsWithoutRoot[0]);
+    cqHelper.executeCQ(client1, cqName, false, null);
+    
+    final int numObjects = 10;
+    client1.invoke(new CacheSerializableRunnable("Populate region") {
+
+      @Override
+      public void run2() throws CacheException {
+        Region region = getCache().getRegion("/" + regions[0]);
+        for (int i = 1; i <= numObjects; i++) {
+          getCache().getLogger().fine("### DOING PUT with key: " + ("KEY-" + 
i)); 
+          Portfolio p = new Portfolio(i);
+          region.put("KEY-"+i, p);
+        }
+      }
+      
+    });
+    
+    client1.invokeAsync(new CacheSerializableRunnable("Wait for CqEvent") {
+      
+      @Override
+      public void run2() throws CacheException {
+        //Check for region destroyed event from server.
+        Region localRegion = getCache().getRegion("/"+ regions[0]);
+        assertNotNull(localRegion);
+        
+        CqQueryTestListener cqListener = 
(CqQueryTestListener)getCache().getQueryService().getCq(cqName).getCqAttributes().getCqListener();
+        assertNotNull(cqListener);
+        
+        cqListener.waitForTotalEvents(numObjects + 1 /*Destroy region event*/);
+                
+      }
+    });
+    
+    client2.invoke(new CacheSerializableRunnable("Destroy region on server") {
+      
+      @Override
+      public void run2() throws CacheException {
+        //Check for region destroyed event from server.
+        Region localRegion = getCache().getRegion("/"+ regions[0]);
+        assertNotNull(localRegion);
+        
+        localRegion.destroyRegion();
+      }
+    });
+    
+    client1.invoke(new CacheSerializableRunnable("Check for destroyed region 
and closed CQ") {
+      
+      @Override
+      public void run2() throws CacheException {
+        // Check for region destroyed event from server.
+        Region localRegion = getCache().getRegion("/" + regions[0]);
+        // IF NULL - GOOD
+        // ELSE - get listener and wait for destroyed.
+        if (localRegion != null) {
+
+          // REGION NULL
+          Log.getLogWriter().info("Local region is NOT null in client 1");
+          
+          pause(5*1000);
+          CqQuery[] cqs = getCache().getQueryService().getCqs();
+          if (cqs != null && cqs.length > 0) {
+            assertTrue(cqs[0].isClosed());
+          }
+
+          // If cqs length is zero then Cq got closed and removed from 
CQService.
+          assertNull(
+              "Region is still available on client1 even after performing 
destroyRegion from client2 on server."
+                  + "Client1 must have received destroyRegion message from 
server with CQ parts in it.",
+              getCache().getRegion("/" + regions[0]));
+
+        }
+      }
+    });
+    
+    cqHelper.closeServer(server);
+  }
+  
+  // helper methods.
+  
+  /**
+   * create bridge server with default attributes for partitioned region.
+   */
+  public void createServer(VM server) {
+    createServer(server, 0, false, 0);
+  }
+  
+  /**
+   * create accessor vm if the given accessor parameter variable is true.
+   * @param server VM to create bridge server.
+   * @param accessor boolean if true creates an accessor bridge server.
+   */
+  public void createServer(VM server, boolean accessor){
+    createServer(server, 0, accessor, 0);
+  }
+  
+  /**
+   * create server with partitioned region with redundant copies.
+   * @param server VM where to create the bridge server.
+   * @param accessor boolean if true create partitioned region with local max 
memory zero.
+   * @param redundantCopies number of redundant copies for a partitioned 
region.
+   */
+  public void createServer(VM server, boolean accessor, int redundantCopies){
+    createServer(server, 0, accessor, redundantCopies);
+  }
+  
+  /**
+   * Create a bridge server with partitioned region.
+   * @param server VM where to create the bridge server.
+   * @param port bridge server port.
+   * @param isAccessor if true the under lying partitioned region will not 
host data on this vm.
+   * @param redundantCopies number of redundant copies for the primary bucket.
+   */
+  public void createServer(VM server, final int port,final boolean isAccessor, 
final int redundantCopies)
+  {
+    SerializableRunnable createServer = new CacheSerializableRunnable(
+        "Create Cache Server") {
+      public void run2() throws CacheException
+      {
+          getLogWriter().info("### Create Cache Server. ###");
+          //AttributesFactory factory = new AttributesFactory();
+          //factory.setScope(Scope.DISTRIBUTED_ACK);
+          //factory.setMirrorType(MirrorType.KEYS_VALUES);
+          
+          //int maxMem = 0;
+          AttributesFactory attr = new AttributesFactory();
+          //attr.setValueConstraint(valueConstraint);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          if (isAccessor){
+            paf.setLocalMaxMemory(0);
+          }
+          PartitionAttributes prAttr = 
paf.setTotalNumBuckets(197).setRedundantCopies(redundantCopies).create();
+          attr.setPartitionAttributes(prAttr);
+          
+          assertFalse(getSystem().isLoner());
+          
//assertTrue(getSystem().getDistributionManager().getOtherDistributionManagerIds().size()
 > 0);
+          for (int i = 0; i < regions.length; i++) {
+            Region r = createRegion(regions[i], attr.create());
+            getLogWriter().info("Server created the region: "+r);
+          }
+//          pause(2000);
+          try {
+            startBridgeServer(port, true);
+          }
+          catch (Exception ex) {
+            fail("While starting CacheServer", ex);
+          }
+//          pause(2000);
+       
+      }
+    };
+
+    server.invoke(createServer);
+  }
+  
+  /**
+   * Create a bridge server with partitioned region.
+   * @param server VM where to create the bridge server.
+   * @param port bridge server port.
+   * @param isAccessor if true the under lying partitioned region will not 
host data on this vm.
+   * @param redundantCopies number of redundant copies for the primary bucket.
+   */
+  public void createServerWithoutRootRegion(VM server, final int port,final 
boolean isAccessor, final int redundantCopies)
+  {
+    SerializableRunnable createServer = new CacheSerializableRunnable(
+        "Create Cache Server") {
+      public void run2() throws CacheException
+      {
+          getLogWriter().info("### Create Cache Server. ###");
+          //AttributesFactory factory = new AttributesFactory();
+          //factory.setScope(Scope.DISTRIBUTED_ACK);
+          //factory.setMirrorType(MirrorType.KEYS_VALUES);
+          
+          //int maxMem = 0;
+          AttributesFactory attr = new AttributesFactory();
+          //attr.setValueConstraint(valueConstraint);
+          PartitionAttributesFactory paf = new PartitionAttributesFactory();
+          if (isAccessor){
+            paf.setLocalMaxMemory(0);
+          }
+          PartitionAttributes prAttr = 
paf.setTotalNumBuckets(1).setRedundantCopies(redundantCopies).create();
+          attr.setPartitionAttributes(prAttr);
+          
+          assertFalse(getSystem().isLoner());
+          
//assertTrue(getSystem().getDistributionManager().getOtherDistributionManagerIds().size()
 > 0);
+          for (int i = 0; i < regions.length; i++) {
+            Region r = createRegionWithoutRoot(regions[i], attr.create());
+            getLogWriter().info("Server created the region: "+r);
+          }
+//          pause(2000);
+          try {
+            startBridgeServer(port, true);
+          }
+          catch (Exception ex) {
+            fail("While starting CacheServer", ex);
+          }
+//          pause(2000);
+      }
+
+      private Region createRegionWithoutRoot(String regionName,
+          RegionAttributes create) {
+        getCache().createRegion(regionName, create);
+        return null;
+      }
+    };
+
+    server.invoke(createServer);
+  }
+  /**
+   * Starts a bridge server on the given port, using the given
+   * deserializeValues and notifyBySubscription to serve up the
+   * given region.
+   *
+   * @since 5.5
+   */
+  protected void startBridgeServer(int port, boolean notifyBySubscription)
+  throws IOException {
+    
+    Cache cache = getCache();
+    CacheServer bridge = cache.addCacheServer();
+    bridge.setPort(port);
+    bridge.setNotifyBySubscription(notifyBySubscription);
+    bridge.start();
+    bridgeServerPort = bridge.getPort();
+  }
+  
+  /* Create Client */
+  public void createClient(VM client, final int serverPort, final String 
serverHost) {
+    int[] serverPorts = new int[] {serverPort};
+    createClient(client, serverPorts, serverHost, null); 
+  }
+  
+  
+  /* Create Client */
+  public void createClient(VM client, final int[] serverPorts, final String 
serverHost, final String redundancyLevel) {
+    SerializableRunnable createQService =
+      new CacheSerializableRunnable("Create Client") {
+      public void run2() throws CacheException {
+        getLogWriter().info("### Create Client. ###");
+        getLogWriter().info(
+            "Will connect to server at por: " + serverPorts[0] + " and at host 
: "
+             + serverHost);
+        //Region region1 = null;
+        // Initialize CQ Service.
+        try {
+          getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+        
+        AttributesFactory regionFactory = new AttributesFactory();
+        regionFactory.setScope(Scope.LOCAL);
+        
+        if (redundancyLevel != null){
+          ClientServerTestCase.configureConnectionPool(regionFactory, 
serverHost, serverPorts[0],-1, true, Integer.parseInt(redundancyLevel), -1, 
null);
+        } else {
+          ClientServerTestCase.configureConnectionPool(regionFactory, 
serverHost, serverPorts[0], -1, true, -1, -1, null);
+        }
+        
+        for (int i=0; i < regions.length; i++) {        
+          Region clientRegion = createRegion(regions[i], 
regionFactory.createRegionAttributes());
+          getLogWriter().info("### Successfully Created Region on Client :" + 
clientRegion);
+          //region1.getAttributesMutator().setCacheListener(new CqListener());
+        }
+      }
+    };
+    
+    client.invoke(createQService);
+  }
+  
+  public void createCQ(VM vm, final String cqName, final String queryStr) {
+    vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) {
+      public void run2() throws CacheException {
+        //pause(60 * 1000);
+        //getLogWriter().info("### DEBUG CREATE CQ START ####");
+        //pause(20 * 1000);
+        
+        getLogWriter().info("### Create CQ. ###" + cqName);
+        // Get CQ Service.
+        QueryService cqService = null;
+        try {
+          cqService = getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+        // Create CQ Attributes.
+        CqAttributesFactory cqf = new CqAttributesFactory();
+        CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())};
+        ((CqQueryTestListener)cqListeners[0]).cqName = cqName;
+        
+        cqf.initCqListeners(cqListeners);
+        CqAttributes cqa = cqf.create();
+        
+        // Create CQ.
+        try {
+          CqQuery cq1 = cqService.newCq(cqName, queryStr, cqa);
+          assertTrue("newCq() state mismatch", cq1.getState().isStopped());
+          getLogWriter().info("Created a new CqQuery : "+cq1);
+        } catch (Exception ex){
+          AssertionError err = new AssertionError("Failed to create CQ " + 
cqName + " . ");
+          err.initCause(ex);
+          getLogWriter().info("CqService is :" + cqService, err);
+          throw err;
+        }
+      }
+    });   
+  } 
+  
+  /* Returs Cache Server Port */
+  private static int getCacheServerPort() {
+    return bridgeServerPort;
+  }
+  
+  private static String[] getCqs() {
+    CqQuery[] cqs = CacheFactory.getAnyInstance().getQueryService().getCqs();
+    
+    String[] cqnames = new String[cqs.length];
+    int idx = 0;
+    for(CqQuery cq : cqs) {
+      cqnames[idx++] = cq.getName();
+    }
+    
+    return cqnames;
+  }
+  
+  private static int getCqCountFromRegionProfile() {
+    
+    LocalRegion region1 = 
(LocalRegion)CacheFactory.getAnyInstance().getRegion("/root/regionA");
+    
+    return region1.getFilterProfile().getCqCount();
+  }
+  
+  /* Create/Init values */
+  public void createValues(VM vm, final String regionName, final int size) {
+    vm.invoke(new CacheSerializableRunnable("Create values for region : 
"+regionName) {
+      public void run2() throws CacheException {
+        Region region1 = getRootRegion().getSubregion(regionName);
+        for (int i = 1; i <= size; i++) {
+          region1.put(KEY+i, new Portfolio(i));
+        }
+        getLogWriter().info("### Number of Entries in Region :" + 
region1.keys().size());
+      }
+    });
+  }
+  
+  private void assertLocalMaxMemory (VM vm){
+    vm.invoke(new CacheSerializableRunnable("Create values") {
+      public void run2() throws CacheException {
+        for (int i = 0; i< regions.length ; i++) {
+          Region region = getRootRegion().getSubregion(regions[i]);
+          assertEquals("The region should be configure with local max memory 
zero : "
+                  + region, region.getAttributes().getPartitionAttributes()
+                  .getLocalMaxMemory(), 0);
+        }
+        
+      }
+    });
+  }
+  
+  
+  public void createCacheClient(VM client, final int serverPort, final String 
serverHost){
+    createCacheClient(client, new String[]{serverHost}, new int[]{serverPort}, 
null);
+  }
+  
+  public void createCacheClient(VM vm, final String[] serverHosts, final int[] 
serverPorts, final String redundancyLevel) {
+    vm.invoke(new CacheSerializableRunnable("createCacheClient") {
+      public void run2() throws CacheException {
+        getLogWriter().info(
+            "Will connect to server at por: " + serverPorts[0] + " and at host 
: "
+             + serverHosts[0]);
+        ClientCacheFactory ccf = new ClientCacheFactory();
+        
ccf.addPoolServer(serverHosts[0]/*getServerHostName(Host.getHost(0))*/, 
serverPorts[0]);
+        ccf.setPoolSubscriptionEnabled(true);
+        ccf.set("log-level", getDUnitLogLevel());
+        
+        // Create Client Cache.
+        getClientCache(ccf);
+        
+        //Create regions
+        // Initialize CQ Service.
+        try {
+          getCache().getQueryService();
+        } catch (Exception cqe) {
+          cqe.printStackTrace();
+          fail("Failed to getCQService.");
+        }
+        
+        AttributesFactory regionFactory = new AttributesFactory();
+        regionFactory.setScope(Scope.LOCAL);
+        
+        if (redundancyLevel != null){
+          ClientServerTestCase.configureConnectionPool(regionFactory, 
serverHosts[0], serverPorts[0],-1, true, Integer.parseInt(redundancyLevel), -1, 
null);
+        } else {
+          ClientServerTestCase.configureConnectionPool(regionFactory, 
serverHosts[0], serverPorts[0], -1, true, -1, -1, null);
+        }
+        
+        for (int i=0; i < regions.length; i++) {        
+          Region clientRegion = 
((ClientCache)getCache()).createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY)
+          .create(regions[i]);
+          getLogWriter().info("### Successfully Created Region on Client :" + 
clientRegion);
+          //region1.getAttributesMutator().setCacheListener(new CqListener());
+        }
+      }
+    });   
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryOptimizedExecuteDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryOptimizedExecuteDUnitTest.java
 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryOptimizedExecuteDUnitTest.java
new file mode 100644
index 0000000..a9a1ac8
--- /dev/null
+++ 
b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/PartitionedRegionCqQueryOptimizedExecuteDUnitTest.java
@@ -0,0 +1,238 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.cache.query.cq.dunit;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.query.data.Portfolio;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl;
+import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+
+import dunit.Host;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+public class PartitionedRegionCqQueryOptimizedExecuteDUnitTest extends 
PartitionedRegionCqQueryDUnitTest{
+
+  public PartitionedRegionCqQueryOptimizedExecuteDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false;
+      }
+    });
+  }
+  
+  @Override
+  public void tearDown2() throws Exception {
+    invokeInEveryVM(new SerializableRunnable("getSystem") {
+      public void run() {
+        CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true;
+        CqServiceProvider.MAINTAIN_KEYS = true;
+      }
+    });
+    super.tearDown2();
+  }
+  
+  public void testCqExecuteWithoutQueryExecution() throws Exception {
+    final Host host = Host.getHost(0);
+    final VM server = host.getVM(0);
+    final VM client = host.getVM(1);
+    final int numOfEntries = 10;
+    final String cqName = "testCqExecuteWithoutQueryExecution_1";
+
+    createServer(server);
+    // Create values.
+    createValues(server, regions[0], numOfEntries);
+
+    final int thePort = 
server.invokeInt(PartitionedRegionCqQueryOptimizedExecuteDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    // Create client.
+    createClient(client, thePort, host0);
+
+     /* Create CQs. */
+    createCQ(client, cqName, cqs[0]); 
+    
+    cqHelper.validateCQCount(client, 1);
+    
+    cqHelper.executeCQ(client, cqName, false, null);
+
+    server.invoke(new CacheSerializableRunnable("execute cq") {
+      public void run2() throws CacheException {
+        assertFalse("CqServiceImpl.EXECUTE_QUERY_DURING_INIT flag should be 
false ", CqServiceImpl.EXECUTE_QUERY_DURING_INIT);
+        int numOfQueryExecutions = (Integer) 
((GemFireCacheImpl)getCache()).getCachePerfStats().getStats().get("queryExecutions");
+        assertEquals("Number of query executions for cq.execute should be 0 ", 
0, numOfQueryExecutions);
+      }
+    });
+    
+    // Create more values.
+    server.invoke(new CacheSerializableRunnable("Create values") {
+      public void run2() throws CacheException {
+        Region region1 = getRootRegion().getSubregion(regions[0]);
+        for (int i = numOfEntries+1; i <= numOfEntries*2; i++) {
+          region1.put(KEY+i, new Portfolio(i));
+        }
+        getLogWriter().info("### Number of Entries in Region :" + 
region1.keys().size());
+      }
+    });
+    
+    cqHelper.waitForCreated(client, cqName, KEY+numOfEntries*2);
+
+    cqHelper.validateCQ(client, cqName,
+        /* resultSize: */ cqHelper.noTest,
+        /* creates: */ numOfEntries,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ numOfEntries,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ numOfEntries);
+    
+    // Update values.
+    createValues(server, regions[0], 5);
+    createValues(server, regions[0], 10);
+    
+    cqHelper.waitForUpdated(client, cqName, KEY+numOfEntries);
+    
+    
+    // validate Update events.
+    cqHelper.validateCQ(client, cqName,
+        /* resultSize: */ cqHelper.noTest,
+        /* creates: */ numOfEntries,
+        /* updates: */ 15,
+        /* deletes; */ 0,
+        /* queryInserts: */ numOfEntries,
+        /* queryUpdates: */ 15,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ numOfEntries + 15);
+    
+    // Validate delete events.
+    cqHelper.deleteValues(server, regions[0], 5);
+    cqHelper.waitForDestroyed(client, cqName, KEY+5);
+    
+    cqHelper.validateCQ(client, cqName,
+        /* resultSize: */ cqHelper.noTest,
+        /* creates: */ numOfEntries,
+        /* updates: */ 15,
+        /* deletes; */5,
+        /* queryInserts: */ numOfEntries,
+        /* queryUpdates: */ 15,
+        /* queryDeletes: */ 5,
+        /* totalEvents: */ numOfEntries + 15 + 5);
+
+    cqHelper.closeClient(client);   
+    cqHelper.closeServer(server);    
+  }
+  
+  public void testCqExecuteWithoutQueryExecutionAndNoRSCaching() throws 
Exception {
+    final Host host = Host.getHost(0);
+    final VM server = host.getVM(0);
+    final VM client = host.getVM(1);
+    final int numOfEntries = 10;
+    final String cqName = "testCqExecuteWithoutQueryExecution_1";
+
+    server.invoke(new CacheSerializableRunnable("execute cq") {
+      public void run2() throws CacheException {
+        CqServiceProvider.MAINTAIN_KEYS = false;
+      }
+    });
+    
+    createServer(server);
+    // Create values.
+    createValues(server, regions[0], numOfEntries);
+
+    final int thePort = 
server.invokeInt(PartitionedRegionCqQueryDUnitTest.class,
+        "getCacheServerPort");
+    final String host0 = getServerHostName(server.getHost());
+
+    // Create client.
+    createClient(client, thePort, host0);
+
+     /* Create CQs. */
+    createCQ(client, cqName, cqs[0]); 
+    
+    cqHelper.validateCQCount(client, 1);
+    
+    cqHelper.executeCQ(client, cqName, false, null);
+
+    server.invoke(new CacheSerializableRunnable("execute cq") {
+      public void run2() throws CacheException {
+        assertFalse("CqServiceImpl.EXECUTE_QUERY_DURING_INIT flag should be 
false ", CqServiceImpl.EXECUTE_QUERY_DURING_INIT);
+        assertFalse("gemfire.cq.MAINTAIN_KEYS flag should be false ", 
CqServiceProvider.MAINTAIN_KEYS);
+        int numOfQueryExecutions = (Integer) 
((GemFireCacheImpl)getCache()).getCachePerfStats().getStats().get("queryExecutions");
+        assertEquals("Number of query executions for cq.execute should be 0 ", 
0, numOfQueryExecutions);
+      }
+    });
+    
+    // Create more values.
+    server.invoke(new CacheSerializableRunnable("Create values") {
+      public void run2() throws CacheException {
+        Region region1 = getRootRegion().getSubregion(regions[0]);
+        for (int i = numOfEntries+1; i <= numOfEntries*2; i++) {
+          region1.put(KEY+i, new Portfolio(i));
+        }
+        getLogWriter().info("### Number of Entries in Region :" + 
region1.keys().size());
+      }
+    });
+    
+    cqHelper.waitForCreated(client, cqName, KEY+numOfEntries*2);
+
+    cqHelper.validateCQ(client, cqName,
+        /* resultSize: */ cqHelper.noTest,
+        /* creates: */ numOfEntries,
+        /* updates: */ 0,
+        /* deletes; */ 0,
+        /* queryInserts: */ numOfEntries,
+        /* queryUpdates: */ 0,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ numOfEntries);
+    
+    // Update values.
+    createValues(server, regions[0], 5);
+    createValues(server, regions[0], 10);
+    
+    cqHelper.waitForUpdated(client, cqName, KEY+numOfEntries);
+    
+    
+    // validate Update events.
+    cqHelper.validateCQ(client, cqName,
+        /* resultSize: */ cqHelper.noTest,
+        /* creates: */ numOfEntries,
+        /* updates: */ 15,
+        /* deletes; */ 0,
+        /* queryInserts: */ numOfEntries,
+        /* queryUpdates: */ 15,
+        /* queryDeletes: */ 0,
+        /* totalEvents: */ numOfEntries + 15);
+    
+    // Validate delete events.
+    cqHelper.deleteValues(server, regions[0], 5);
+    cqHelper.waitForDestroyed(client, cqName, KEY+5);
+    
+    cqHelper.validateCQ(client, cqName,
+        /* resultSize: */ cqHelper.noTest,
+        /* creates: */ numOfEntries,
+        /* updates: */ 15,
+        /* deletes; */5,
+        /* queryInserts: */ numOfEntries,
+        /* queryUpdates: */ 15,
+        /* queryDeletes: */ 5,
+        /* totalEvents: */ numOfEntries + 15 + 5);
+
+    cqHelper.closeClient(client);   
+    cqHelper.closeServer(server);    
+  }
+}

Reply via email to