http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java new file mode 100644 index 0000000..f4f524d --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolDUnitTest.java @@ -0,0 +1,3315 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ + +package com.gemstone.gemfire.cache.query.cq.dunit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.EvictionAction; +import com.gemstone.gemfire.cache.EvictionAttributes; +import com.gemstone.gemfire.cache.MirrorType; +import com.gemstone.gemfire.cache.PartitionAttributes; +import com.gemstone.gemfire.cache.PartitionAttributesFactory; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.client.PoolFactory; +import com.gemstone.gemfire.cache.client.PoolManager; +import com.gemstone.gemfire.cache.query.CqAttributes; +import com.gemstone.gemfire.cache.query.CqAttributesFactory; +import com.gemstone.gemfire.cache.query.CqAttributesMutator; +import com.gemstone.gemfire.cache.query.CqExistsException; +import com.gemstone.gemfire.cache.query.CqListener; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.IndexType; +import com.gemstone.gemfire.cache.query.Query; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.cache.query.data.Portfolio; +import com.gemstone.gemfire.cache.query.internal.CqStateImpl; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache30.ClientServerTestCase; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.cache30.CacheTestCase; +import com.gemstone.gemfire.cache30.CertifiableTestCacheListener; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.InitialImageOperation; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; + +import dunit.DistributedTestCase; +import dunit.Host; +import dunit.SerializableRunnable; +import dunit.VM; + +/** + * This class tests the ContiunousQuery mechanism in GemFire. + * It does so by creating a cache server with a cache and a pre-defined region and + * a data loader. The client creates the same region and attaches the connection pool. + * + * + * @author anil + */ +public class CqQueryUsingPoolDUnitTest extends CacheTestCase { + + /** The port on which the bridge server was started in this VM */ + private static int bridgeServerPort; + + protected static int port = 0; + protected static int port2 = 0; + + public static int noTest = -1; + + public final String[] regions = new String[] { + "regionA", + "regionB" + }; + + private final static int CREATE = 0; + private final static int UPDATE = 1; + private final static int DESTROY = 2; + private final static int INVALIDATE = 3; + private final static int CLOSE = 4; + private final static int REGION_CLEAR = 5; + private final static int REGION_INVALIDATE = 6; + + static public final String KEY = "key-"; + + static private final String WAIT_PROPERTY = "CqQueryTest.maxWaitTime"; + + static private final int WAIT_DEFAULT = (20 * 1000); + + public static final long MAX_TIME = Integer.getInteger(WAIT_PROPERTY, + WAIT_DEFAULT).intValue(); + + public final String[] cqs = new String [] { + //0 - Test for ">" + "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0", + + //1 - Test for "=" and "and". + "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID = 2 and p.status='active'", + + //2 - Test for "<" and "and". + "SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and p.status='active'", + + // FOLLOWING CQS ARE NOT TESTED WITH VALUES; THEY ARE USED TO TEST PARSING LOGIC WITHIN CQ. + //3 + "SELECT * FROM /root/" + regions[0] + " ;", + //4 + "SELECT ALL * FROM /root/" + regions[0], + //5 + "import com.gemstone.gemfire.cache.\"query\".data.Portfolio; " + + "SELECT ALL * FROM /root/" + regions[0] + " TYPE Portfolio", + //6 + "import com.gemstone.gemfire.cache.\"query\".data.Portfolio; " + + "SELECT ALL * FROM /root/" + regions[0] + " p TYPE Portfolio", + //7 + "SELECT ALL * FROM /root/" + regions[1] + " p where p.ID < 5 and p.status='active';", + //8 + "SELECT ALL * FROM /root/" + regions[0] + " ;", + //9 + "SELECT ALL * FROM /root/" + regions[0] +" p where p.description = NULL", + // 10 + "SELECT ALL * FROM /root/" + regions[0] + " p where p.ID > 0 and p.status='active'", + //11 - Test for "No Alias" + "SELECT ALL * FROM /root/" + regions[0] + " where ID > 0", + + }; + + private String[] invalidCQs = new String [] { + // Test for ">" + "SELECT ALL * FROM /root/invalidRegion p where p.ID > 0" + }; + + public CqQueryUsingPoolDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + + //We're seeing this on the server when the client + //disconnects. + addExpectedException("Connection reset"); + addExpectedException("SocketTimeoutException"); + addExpectedException("ServerConnectivityException"); + addExpectedException("Socket Closed"); + addExpectedException("SocketException"); + // avoid IllegalStateException from HandShake by connecting all vms tor + // system before creating connection pools + getSystem(); + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + getSystem(); + } + }); + + } + + /* Returns Cache Server Port */ + static int getCacheServerPort() { + return bridgeServerPort; + } + + /* Create Cache Server */ + public void createServer(VM server) + { + createServer(server, 0); + } + + public void createServer(VM server, final int p) + { + createServer(server, p, false); + } + + public void createServer(VM server, final int thePort, final boolean eviction) { + MirrorType mirrorType = MirrorType.KEYS_VALUES; + createServer(server, thePort, eviction, mirrorType); + } + + public void createServer(VM server, final int thePort, final boolean eviction, final MirrorType mirrorType) + { + SerializableRunnable createServer = new CacheSerializableRunnable( + "Create Cache Server") { + public void run2() throws CacheException + { + getLogWriter().info("### Create Cache Server. ###"); + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.DISTRIBUTED_ACK); + factory.setMirrorType(mirrorType); + + // setting the eviction attributes. + if (eviction) { + EvictionAttributes evictAttrs = EvictionAttributes + .createLRUEntryAttributes(100000, EvictionAction.OVERFLOW_TO_DISK); + factory.setEvictionAttributes(evictAttrs); + } + + for (int i = 0; i < regions.length; i++) { + createRegion(regions[i], factory.createRegionAttributes()); + } + + try { + startBridgeServer(thePort, true); + } + + catch (Exception ex) { + fail("While starting CacheServer", ex); + } + + } + }; + + server.invoke(createServer); + } + + /** + * Create a bridge server with partitioned region. + * @param server VM where to create the bridge server. + * @param port bridge server port. + * @param isAccessor if true the under lying partitioned region will not host data on this vm. + * @param redundantCopies number of redundant copies for the primary bucket. + */ + public void createServerWithPR(VM server, final int port, final boolean isAccessor, final int redundantCopies) + { + SerializableRunnable createServer = new CacheSerializableRunnable( + "Create Cache Server") { + public void run2() throws CacheException + { + getLogWriter().info("### Create Cache Server. ###"); + //AttributesFactory factory = new AttributesFactory(); + //factory.setScope(Scope.DISTRIBUTED_ACK); + //factory.setMirrorType(MirrorType.KEYS_VALUES); + + //int maxMem = 0; + AttributesFactory attr = new AttributesFactory(); + //attr.setValueConstraint(valueConstraint); + PartitionAttributesFactory paf = new PartitionAttributesFactory(); + if (isAccessor){ + paf.setLocalMaxMemory(0); + } + PartitionAttributes prAttr = paf.setTotalNumBuckets(197).setRedundantCopies(redundantCopies).create(); + attr.setPartitionAttributes(prAttr); + + assertFalse(getSystem().isLoner()); + //assertTrue(getSystem().getDistributionManager().getOtherDistributionManagerIds().size() > 0); + for (int i = 0; i < regions.length; i++) { + Region r = createRegion(regions[i], attr.create()); + getLogWriter().info("Server created the region: "+r); + } + try { + startBridgeServer(port, true); + } + catch (Exception ex) { + fail("While starting CacheServer", ex); + } + + } + }; + + server.invoke(createServer); + } + + + /* Close Cache Server */ + public void closeServer(VM server) { + server.invoke(new SerializableRunnable("Close CacheServer") { + public void run() { + getLogWriter().info("### Close CacheServer. ###"); + stopBridgeServer(getCache()); + } + }); + } + + /* Create Client */ + public void createClient(VM client, final int serverPort, final String serverHost) { + int[] serverPorts = new int[] {serverPort}; + createClient(client, serverPorts, serverHost, null, null); + } + + /* Create Client */ + public void createClient(VM client, final int[] serverPorts, final String serverHost, final String redundancyLevel, + final String poolName) { + SerializableRunnable createQService = + new CacheSerializableRunnable("Create Client") { + public void run2() throws CacheException { + getLogWriter().info("### Create Client. ###"); + //Region region1 = null; + // Initialize CQ Service. + try { + getCache().getQueryService(); + addExpectedException("java.net.ConnectException"); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + AttributesFactory regionFactory = new AttributesFactory(); + regionFactory.setScope(Scope.LOCAL); + + if (poolName != null) { + regionFactory.setPoolName(poolName); + } else { + if (redundancyLevel != null){ + ClientServerTestCase.configureConnectionPool(regionFactory, serverHost, serverPorts, true, Integer.parseInt(redundancyLevel), -1, null); + } else { + ClientServerTestCase.configureConnectionPool(regionFactory, serverHost,serverPorts, true, -1, -1, null); + } + } + for (int i=0; i < regions.length; i++) { + createRegion(regions[i], regionFactory.createRegionAttributes()); + getLogWriter().info("### Successfully Created Region on Client :" + regions[i]); + //region1.getAttributesMutator().setCacheListener(new CqListener()); + } + } + }; + + client.invoke(createQService); + } + + + /* Close Client */ + public void closeClient(VM client) { + SerializableRunnable closeCQService = + new CacheSerializableRunnable("Close Client") { + public void run2() throws CacheException { + getLogWriter().info("### Close Client. ###"); + try { + ((DefaultQueryService)getCache().getQueryService()).closeCqService(); + } catch (Exception ex) { + getLogWriter().info("### Failed to get CqService during ClientClose() ###"); + } + + } + }; + + client.invoke(closeCQService); + pause(1000); + } + + public void createFunctionalIndex(VM vm, final String indexName, final String indexedExpression, final String fromClause) { + vm.invoke(new CacheSerializableRunnable("Create Functional Index") { + public void run2() throws CacheException { + QueryService qs = null; + try { + qs = getCache().getQueryService(); + }catch (Exception ex) { + getLogWriter().info("### Failed to get CqService during ClientClose() ###"); + } + try { + qs.createIndex(indexName, IndexType.FUNCTIONAL, indexedExpression, fromClause); + } catch (Exception ex) { + getLogWriter().info("### Failed to create Index :" + indexName); + } + } + }); + } + + /* Create/Init values */ + public void createValues(VM vm, final String regionName, final int size) { + vm.invoke(new CacheSerializableRunnable("Create values") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regionName); + for (int i = 1; i <= size; i++) { + region1.put(KEY+i, new Portfolio(i, i)); + } + getLogWriter().info("### Number of Entries in Region :" + region1.keys().size()); + } + }); + } + + /* Create/Init values */ + public void createValuesWithTime(VM vm, final String regionName, final int size) { + vm.invoke(new CacheSerializableRunnable("Create values") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regionName); + for (int i = 1; i <= size; i++) { + Portfolio portfolio = new Portfolio(i); + portfolio.createTime = System.currentTimeMillis(); + region1.put(KEY+i, portfolio); + } + getLogWriter().info("### Number of Entries in Region :" + region1.keys().size()); + } + }); + } + + /* delete values */ + public void deleteValues(VM vm, final String regionName, final int size) { + vm.invoke(new CacheSerializableRunnable("Create values") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regionName); + for (int i = 1; i <= size; i++) { + region1.destroy(KEY+i); + } + getLogWriter().info("### Number of Entries In Region after Delete :" + region1.keys().size()); + } + + }); + } + + /** + * support for invalidating values. + */ + public void invalidateValues(VM vm, final String regionName, final int size) { + vm.invoke(new CacheSerializableRunnable("Create values") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regionName); + for (int i = 1; i <= size; i++) { + region1.invalidate(KEY+i); + } + getLogWriter().info("### Number of Entries In Region after Delete :" + region1.keys().size()); + } + + }); + } + + public void createPool(VM vm, String poolName, String server, int port) { + createPool(vm, poolName, new String[]{server}, new int[]{port}); + } + + public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports) { + createPool(vm, poolName, servers, ports, null); + } + + public void createPool(VM vm, final String poolName, final String[] servers, final int[] ports, final String redundancyLevel) { + vm.invoke(new CacheSerializableRunnable("createPool :" + poolName) { + public void run2() throws CacheException { + // Create Cache. + getCache(); + addExpectedException("java.net.ConnectException"); + + PoolFactory cpf = PoolManager.createFactory(); + cpf.setSubscriptionEnabled(true); + + if (redundancyLevel != null){ + int redundancy = Integer.parseInt(redundancyLevel); + cpf.setSubscriptionRedundancy(redundancy); + } + + for (int i=0; i < servers.length; i++){ + getLogWriter().info("### Adding to Pool. ### Server : " + servers[i] + " Port : " + ports[i]); + cpf.addServer(servers[i], ports[i]); + } + + cpf.create(poolName); + } + }); + } + + /* Register CQs */ + public void createCQ(VM vm, final String poolName, final String cqName, final String queryStr) { + vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) { + public void run2() throws CacheException { + //pause(60 * 1000); + //getLogWriter().info("### DEBUG CREATE CQ START ####"); + //pause(20 * 1000); + + getLogWriter().info("### Create CQ. ###" + cqName); + // Get CQ Service. + QueryService qService = null; + try { + qService = (PoolManager.find(poolName)).getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + // Create CQ Attributes. + CqAttributesFactory cqf = new CqAttributesFactory(); + CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())}; + ((CqQueryTestListener)cqListeners[0]).cqName = cqName; + + cqf.initCqListeners(cqListeners); + CqAttributes cqa = cqf.create(); + + // Create CQ. + try { + CqQuery cq1 = qService.newCq(cqName, queryStr, cqa); + assertTrue("newCq() state mismatch", cq1.getState().isStopped()); + } catch (Exception ex){ + AssertionError err = new AssertionError("Failed to create CQ " + cqName + " . "); + err.initCause(ex); + getLogWriter().info("QueryService is :" + qService, err); + throw err; + } + } + }); + } + + // REMOVE.......... + public void createCQ(VM vm, final String cqName, final String queryStr) { + vm.invoke(new CacheSerializableRunnable("Create CQ :" + cqName) { + public void run2() throws CacheException { + //pause(60 * 1000); + //getLogWriter().info("### DEBUG CREATE CQ START ####"); + //pause(20 * 1000); + + getLogWriter().info("### Create CQ. ###" + cqName); + // Get CQ Service. + QueryService qService = null; + try { + qService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + // Create CQ Attributes. + CqAttributesFactory cqf = new CqAttributesFactory(); + CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())}; + ((CqQueryTestListener)cqListeners[0]).cqName = cqName; + + cqf.initCqListeners(cqListeners); + CqAttributes cqa = cqf.create(); + + // Create CQ. + try { + CqQuery cq1 = qService.newCq(cqName, queryStr, cqa); + assertTrue("newCq() state mismatch", cq1.getState().isStopped()); + } catch (Exception ex){ + AssertionError err = new AssertionError("Failed to create CQ " + cqName + " . "); + err.initCause(ex); + getLogWriter().info("QueryService is :" + qService, err); + throw err; + } + } + }); + } + + /* Register CQs with no name, execute, and close*/ + public void createAndExecCQNoName(VM vm, final String poolName, final String queryStr) { + vm.invoke(new CacheSerializableRunnable("Create CQ with no name:" ) { + public void run2() throws CacheException { + //pause(60 * 1000); + getLogWriter().info("### DEBUG CREATE CQ START ####"); + //pause(20 * 1000); + + getLogWriter().info("### Create CQ with no name. ###"); + // Get CQ Service. + QueryService qService = null; + CqQuery cq1 = null; + String cqName = null; + + try { + qService = (PoolManager.find(poolName)).getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + SelectResults cqResults = null; + for (int i = 0; i < 20; ++i) { + // Create CQ Attributes. + CqAttributesFactory cqf = new CqAttributesFactory(); + CqListener[] cqListeners = {new CqQueryTestListener(getLogWriter())}; + + cqf.initCqListeners(cqListeners); + CqAttributes cqa = cqf.create(); + + // Create CQ with no name and execute with initial results. + try { + cq1 = qService.newCq(queryStr, cqa); + ((CqQueryTestListener)cqListeners[0]).cqName = cq1.getName(); + } catch (Exception ex){ + getLogWriter().info("CQService is :" + qService); + fail("Failed to create CQ with no name" + " . ", ex); + } + + if (cq1 == null) { + getLogWriter().info("Failed to get CqQuery object for CQ with no name."); + } + else { + cqName = cq1.getName(); + getLogWriter().info("Created CQ with no name, generated CQ name: " + cqName + " CQ state:" + cq1.getState()); + assertTrue("Create CQ with no name illegal state", cq1.getState().isStopped()); + } + if ( i%2 == 0) { + try { + cqResults = cq1.executeWithInitialResults(); + } catch (Exception ex){ + getLogWriter().info("CqService is :" + qService); + fail("Failed to execute CQ with initial results, cq name: " + + cqName + " . ", ex); + } + getLogWriter().info("initial result size = " + cqResults.size()); + getLogWriter().info("CQ state after execute with initial results = " + cq1.getState()); + assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning()); + } + else { + try { + cq1.execute(); + } catch (Exception ex){ + getLogWriter().info("CQService is :" + qService); + fail("Failed to execute CQ " + cqName + " . ", ex); + } + getLogWriter().info("CQ state after execute = " + cq1.getState()); + assertTrue("execute() state mismatch", cq1.getState().isRunning()); + } + + //Close the CQ + try { + cq1.close(); + } catch (Exception ex){ + getLogWriter().info("CqService is :" + qService); + fail("Failed to close CQ " + cqName + " . ", ex); + } + assertTrue("closeCq() state mismatch", cq1.getState().isClosed()); + } + } + }); + } + + public void executeCQ(VM vm, final String cqName, final boolean initialResults, + String expectedErr) { + executeCQ(vm, cqName, initialResults, noTest, null, expectedErr); + } + + /** + * Execute/register CQ as running. + * @param initialResults true if initialResults are requested + * @param expectedResultsSize if >= 0, validate results against this size + * @param expectedErr if not null, an error we expect + */ + public void executeCQ(VM vm, final String cqName, + final boolean initialResults, + final int expectedResultsSize, + final String[] expectedKeys, + final String expectedErr) { + vm.invoke(new CacheSerializableRunnable("Execute CQ :" + cqName) { + + private void work() throws CacheException { + //pause(60 * 1000); + getLogWriter().info("### DEBUG EXECUTE CQ START ####"); + //pause(20 * 1000); + + // Get CQ Service. + QueryService cqService = null; + CqQuery cq1 = null; +// try { + cqService = getCache().getQueryService(); +// } catch (Exception cqe) { +// getLogWriter().error(cqe); +// AssertionError err = new AssertionError("Failed to get QueryService" + cqName); +// err.initCause(ex); +// throw err; +// fail("Failed to getCQService."); +// } + + // Get CqQuery object. + try { + cq1 = cqService.getCq(cqName); + if (cq1 == null) { + getLogWriter().info("Failed to get CqQuery object for CQ name: " + cqName); + fail("Failed to get CQ " + cqName); + } + else { + getLogWriter().info("Obtained CQ, CQ name: " + cq1.getName()); + assertTrue("newCq() state mismatch", cq1.getState().isStopped()); + } + } catch (Exception ex){ + getLogWriter().info("CqService is :" + cqService); + getLogWriter().error(ex); + AssertionError err = new AssertionError("Failed to execute CQ " + cqName); + err.initCause(ex); + throw err; + } + + if (initialResults) { + SelectResults cqResults = null; + + try { + cqResults = cq1.executeWithInitialResults(); + } catch (Exception ex){ + getLogWriter().info("CqService is :" + cqService); + ex.printStackTrace(); + AssertionError err = new AssertionError("Failed to execute CQ " + cqName); + err.initCause(ex); + throw err; + } + getLogWriter().info("initial result size = " + cqResults.size()); + assertTrue("executeWithInitialResults() state mismatch", cq1.getState().isRunning()); + if (expectedResultsSize >= 0) { + assertEquals("Unexpected results size for CQ: " + cqName + + " CQ Query :" + cq1.getQueryString(), + expectedResultsSize, cqResults.size()); + } + + if (expectedKeys != null) { + HashSet resultKeys = new HashSet(); + for (Object o : cqResults.asList()) { + Struct s = (Struct)o; + resultKeys.add(s.get("key")); + } + for (int i =0; i < expectedKeys.length; i++){ + assertTrue("Expected key :" + expectedKeys[i] + + " Not found in CqResults for CQ: " + cqName + + " CQ Query :" + cq1.getQueryString() + + " Keys in CqResults :" + resultKeys, + resultKeys.contains(expectedKeys[i])); + } + } + } + else { + try { + cq1.execute(); + } catch (Exception ex){ + AssertionError err = new AssertionError("Failed to execute CQ " + cqName); + err.initCause(ex); + if (expectedErr == null) { + getLogWriter().info("CqService is :" + cqService, err); + } + throw err; + } + assertTrue("execute() state mismatch", cq1.getState().isRunning()); + } + } + + public void run2() throws CacheException { + if (expectedErr != null) { + getCache().getLogger().info("<ExpectedException action=add>" + + expectedErr + "</ExpectedException>"); + } + try { + work(); + } + finally { + if (expectedErr != null) { + getCache().getLogger().info("<ExpectedException action=remove>" + + expectedErr + "</ExpectedException>"); + } + } + } + }); + } + + /* Stop/pause CQ */ + public void stopCQ(VM vm, final String cqName) throws Exception { + vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) { + public void run2() throws CacheException { + getLogWriter().info("### Stop CQ. ###" + cqName); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + // Stop CQ. + CqQuery cq1 = null; + try { + cq1 = cqService.getCq(cqName); + cq1.stop(); + } catch (Exception ex){ + fail("Failed to stop CQ " + cqName + " . ", ex); + } + assertTrue("Stop CQ state mismatch", cq1.getState().isStopped()); + } + }); + } + + // Stop and execute CQ repeatedly + /* Stop/pause CQ */ + private void stopExecCQ(VM vm, final String cqName, final int count) throws Exception { + vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) { + public void run2() throws CacheException { + CqQuery cq1 = null; + getLogWriter().info("### Stop and Exec CQ. ###" + cqName); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCqService.", cqe); + } + + // Get CQ. + try { + cq1 = cqService.getCq(cqName); + } catch (Exception ex){ + fail("Failed to get CQ " + cqName + " . ", ex); + } + + for (int i = 0; i < count; ++i) { + // Stop CQ. + try { + cq1.stop(); + } catch (Exception ex) { + fail("Count = " + i + "Failed to stop CQ " + cqName + " . ", ex); + } + assertTrue("Stop CQ state mismatch, count = " + i, cq1.getState().isStopped()); + getLogWriter().info("After stop in Stop and Execute loop, ran successfully, loop count: " + i); + getLogWriter().info("CQ state: " + cq1.getState()); + + // Re-execute CQ + try { + cq1.execute(); + } catch (Exception ex) { + fail("Count = " + i + "Failed to execute CQ " + cqName + " . ", ex); + } + assertTrue("Execute CQ state mismatch, count = " + i, cq1.getState().isRunning()); + getLogWriter().info("After execute in Stop and Execute loop, ran successfully, loop count: " + i); + getLogWriter().info("CQ state: " + cq1.getState()); + } + } + }); + } + + + /* UnRegister CQs */ + public void closeCQ(VM vm, final String cqName) throws Exception { + vm.invoke(new CacheSerializableRunnable("Close CQ :" + cqName) { + public void run2() throws CacheException { + getLogWriter().info("### Close CQ. ###" + cqName); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCqService.", cqe); + } + + // Close CQ. + CqQuery cq1 = null; + try { + cq1 = cqService.getCq(cqName); + cq1.close(); + } catch (Exception ex) { + fail("Failed to close CQ " + cqName + " . ", ex); + } + assertTrue("Close CQ state mismatch", cq1.getState().isClosed()); + } + }); + } + + /* Register CQs */ + public void registerInterestListCQ(VM vm, final String regionName, final int keySize, final boolean all) { + vm.invoke(new CacheSerializableRunnable("Register InterestList and CQ") { + public void run2() throws CacheException { + + // Get CQ Service. + Region region = null; + try { + region = getRootRegion().getSubregion(regionName); + region.getAttributesMutator().setCacheListener(new CertifiableTestCacheListener(getLogWriter())); + } catch (Exception cqe) { + AssertionError err = new AssertionError("Failed to get Region."); + err.initCause(cqe); + throw err; + + } + + try { + if (all) { + region.registerInterest("ALL_KEYS"); + } else { + List list = new ArrayList(); + for (int i = 1; i <= keySize; i++) { + list.add(KEY+i); + } + region.registerInterest(list); + } + } catch (Exception ex) { + AssertionError err = new AssertionError("Failed to Register InterestList"); + err.initCause(ex); + throw err; + } + } + }); + } + + /* Validate CQ Count */ + public void validateCQCount(VM vm, final int cqCnt) throws Exception { + vm.invoke(new CacheSerializableRunnable("validate cq count") { + public void run2() throws CacheException { + // Get CQ Service. + + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + int numCqs = 0; + try { + numCqs = cqService.getCqs().length; + } catch (Exception ex) { + fail ("Failed to get the CQ Count.", ex); + } + assertEquals("Number of cqs mismatch.", cqCnt, numCqs); + } + }); + } + + + /** + * Throws AssertionError if the CQ can be found or if any other + * error occurs + */ + private void failIfCQExists(VM vm, final String cqName) { + vm.invoke(new CacheSerializableRunnable("Fail if CQ exists") { + public void run2() throws CacheException { + getLogWriter().info("### Fail if CQ Exists. ### " + cqName); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + CqQuery cQuery = cqService.getCq(cqName); + if (cQuery != null) { + fail("Unexpectedly found CqQuery for CQ : " + cqName); + } + } + }); + } + + private void validateCQError(VM vm, final String cqName, + final int numError) { + vm.invoke(new CacheSerializableRunnable("Validate CQs") { + public void run2() throws CacheException { + + getLogWriter().info("### Validating CQ. ### " + cqName); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + CqQuery cQuery = cqService.getCq(cqName); + if (cQuery == null) { + fail("Failed to get CqQuery for CQ : " + cqName); + } + + CqAttributes cqAttr = cQuery.getCqAttributes(); + CqListener cqListener = cqAttr.getCqListener(); + CqQueryTestListener listener = (CqQueryTestListener) cqListener; + listener.printInfo(false); + + // Check for totalEvents count. + if (numError != noTest) { + // Result size validation. + listener.printInfo(true); + assertEquals("Total Event Count mismatch", numError, listener.getErrorEventCount()); + } + } + }); + } + + public void validateCQ(VM vm, final String cqName, + final int resultSize, + final int creates, + final int updates, + final int deletes) { + validateCQ(vm, cqName, resultSize, creates, updates, deletes, + noTest, noTest, noTest, noTest); + } + + public void validateCQ(VM vm, final String cqName, + final int resultSize, + final int creates, + final int updates, + final int deletes, + final int queryInserts, + final int queryUpdates, + final int queryDeletes, + final int totalEvents) { + vm.invoke(new CacheSerializableRunnable("Validate CQs") { + public void run2() throws CacheException { + getLogWriter().info("### Validating CQ. ### " + cqName); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + CqQuery cQuery = cqService.getCq(cqName); + if (cQuery == null) { + fail("Failed to get CqQuery for CQ : " + cqName); + } + + CqAttributes cqAttr = cQuery.getCqAttributes(); + CqListener cqListeners[] = cqAttr.getCqListeners(); + CqQueryTestListener listener = (CqQueryTestListener) cqListeners[0]; + listener.printInfo(false); + + // Check for totalEvents count. + if (totalEvents != noTest) { + // Result size validation. + listener.printInfo(true); + assertEquals("Total Event Count mismatch", totalEvents, listener.getTotalEventCount()); + } + + if (resultSize != noTest) { + //SelectResults results = cQuery.getCqResults(); + //getLogWriter().info("### CQ Result Size is :" + results.size()); + // Result size validation. + // Since ResultSet is not maintained for this release. + // Instead of resultSize its been validated with total number of events. + fail("test for event counts instead of results size"); +// assertEquals("Result Size mismatch", resultSize, listener.getTotalEventCount()); + } + + // Check for create count. + if (creates != noTest) { + // Result size validation. + listener.printInfo(true); + assertEquals("Create Event mismatch", creates, listener.getCreateEventCount()); + } + + // Check for update count. + if (updates != noTest) { + // Result size validation. + listener.printInfo(true); + assertEquals("Update Event mismatch", updates, listener.getUpdateEventCount()); + } + + // Check for delete count. + if (deletes != noTest) { + // Result size validation. + listener.printInfo(true); + assertEquals("Delete Event mismatch", deletes, listener.getDeleteEventCount()); + } + + // Check for queryInsert count. + if (queryInserts != noTest) { + // Result size validation. + listener.printInfo(true); + assertEquals("Query Insert Event mismatch", queryInserts, listener.getQueryInsertEventCount()); + } + + // Check for queryUpdate count. + if (queryUpdates != noTest) { + // Result size validation. + listener.printInfo(true); + assertEquals("Query Update Event mismatch", queryUpdates, listener.getQueryUpdateEventCount()); + } + + // Check for queryDelete count. + if (queryDeletes != noTest) { + // Result size validation. + listener.printInfo(true); + assertEquals("Query Delete Event mismatch", queryDeletes, listener.getQueryDeleteEventCount()); + } + } + }); + } + + public void waitForCreated(VM vm, final String cqName, final String key){ + waitForEvent(vm, 0, cqName, key); + } + + public void waitForUpdated(VM vm, final String cqName, final String key){ + waitForEvent(vm, 1, cqName, key); + } + + public void waitForDestroyed(VM vm, final String cqName, final String key){ + waitForEvent(vm, 2, cqName, key); + } + + public void waitForInvalidated(VM vm, final String cqName, final String key){ + waitForEvent(vm, 3, cqName, key); + } + + public void waitForClose(VM vm, final String cqName){ + waitForEvent(vm, 4, cqName, null); + } + + public void waitForRegionClear(VM vm, final String cqName){ + waitForEvent(vm, 5, cqName, null); + } + + public void waitForRegionInvalidate(VM vm, final String cqName){ + waitForEvent(vm, 6, cqName, null); + } + + private void waitForEvent(VM vm, final int event, final String cqName, final String key) { + vm.invoke(new CacheSerializableRunnable("validate cq count") { + public void run2() throws CacheException { + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + CqQuery cQuery = cqService.getCq(cqName); + if (cQuery == null) { + fail("Failed to get CqQuery for CQ : " + cqName); + } + + CqAttributes cqAttr = cQuery.getCqAttributes(); + CqListener[] cqListener = cqAttr.getCqListeners(); + CqQueryTestListener listener = (CqQueryTestListener) cqListener[0]; + + switch (event) { + case CREATE : + listener.waitForCreated(key); + break; + + case UPDATE : + listener.waitForUpdated(key); + break; + + case DESTROY : + listener.waitForDestroyed(key); + break; + + case INVALIDATE : + listener.waitForInvalidated(key); + break; + + case CLOSE : + listener.waitForClose(); + break; + + case REGION_CLEAR : + listener.waitForRegionClear(); + break; + + case REGION_INVALIDATE : + listener.waitForRegionInvalidate(); + break; + + } + } + }); + } + + /** + * Waits till the CQ state is same as the expected. + * Waits for max time, if the CQ state is not same as expected + * throws exception. + */ + public void waitForCqState(VM vm, final String cqName, final int state) { + vm.invoke(new CacheSerializableRunnable("Wait For cq State") { + public void run2() throws CacheException { + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + CqQuery cQuery = cqService.getCq(cqName); + if (cQuery == null) { + fail("Failed to get CqQuery for CQ : " + cqName); + } + + // Get CQ State. + CqStateImpl cqState = (CqStateImpl)cQuery.getState(); + // Wait max time, till the CQ state is as expected. + final long start = System.currentTimeMillis(); + while (cqState.getState() != state) { + junit.framework.Assert.assertTrue("Waited over " + MAX_TIME + + "ms for Cq State to be changed to " + state + + "; consider raising " + WAIT_PROPERTY, + (System.currentTimeMillis() - start) < MAX_TIME); + pause(100); + } + } + }); + } + + public void clearCQListenerEvents(VM vm, final String cqName) { + vm.invoke(new CacheSerializableRunnable("validate cq count") { + public void run2() throws CacheException { + // Get CQ Service. + + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + CqQuery cQuery = cqService.getCq(cqName); + if (cQuery == null) { + fail("Failed to get CqQuery for CQ : " + cqName); + } + + CqAttributes cqAttr = cQuery.getCqAttributes(); + CqListener cqListener = cqAttr.getCqListener(); + CqQueryTestListener listener = (CqQueryTestListener) cqListener; + listener.getEventHistory(); + } + }); + } + + public void validateQuery(VM vm, final String query, final int resultSize) { + vm.invoke(new CacheSerializableRunnable("Validate Query") { + public void run2() throws CacheException { + getLogWriter().info("### Validating Query. ###"); + QueryService qs = getCache().getQueryService(); + + Query q = qs.newQuery(query); + try { + Object r = q.execute(); + if(r instanceof Collection){ + int rSize = ((Collection)r).size(); + getLogWriter().info("### Result Size is :" + rSize); + assertEquals(rSize, resultSize); + } + } + catch (Exception e) { + fail("Failed to execute the query.", e); + } + } + }); + } + + private Properties getConnectionProps(String[] hosts, int[] ports, Properties newProps) { + + Properties props = new Properties(); + String endPoints = ""; + String host = hosts[0]; + for (int i=0; i < ports.length; i++){ + if (hosts.length > 1) + { + host = hosts[i]; + } + endPoints = endPoints + "server" + i + "=" + host + ":" + ports[i]; + if (ports.length > (i+1)) + { + endPoints = endPoints + ","; + } + } + + props.setProperty("endpoints", endPoints); + props.setProperty("retryAttempts", "1"); + //props.setProperty("establishCallbackConnection", "true"); + //props.setProperty("LBPolicy", "Sticky"); + //props.setProperty("readTimeout", "120000"); + + // Add other property elements. + if (newProps != null) { + Enumeration e = newProps.keys(); + while(e.hasMoreElements()) { + String key = (String)e.nextElement(); + props.setProperty(key, newProps.getProperty(key)); + } + } + return props; + } + + + // Exercise CQ attributes mutator functions + private void mutateCQAttributes(VM vm, final String cqName, final int mutator_function) throws Exception { + vm.invoke(new CacheSerializableRunnable("Stop CQ :" + cqName) { + public void run2() throws CacheException { + CqQuery cq1 = null; + getLogWriter().info("### CQ attributes mutator for ###" + cqName); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + // Get CQ. + try { + cq1 = cqService.getCq(cqName); + } catch (Exception ex) { + fail("Failed to get CQ " + cqName + " . ", ex); + } + CqAttributesMutator cqAttrMutator = cq1.getCqAttributesMutator(); + CqAttributes cqAttr = cq1.getCqAttributes(); + CqListener cqListeners[]; + switch (mutator_function) { + case CREATE: + // Reinitialize with 2 CQ Listeners + CqListener cqListenersArray[] = {new CqQueryTestListener(getCache().getLogger()), + new CqQueryTestListener(getCache().getLogger())}; + cqAttrMutator.initCqListeners(cqListenersArray); + cqListeners = cqAttr.getCqListeners(); + assertEquals("CqListener count mismatch", cqListeners.length, 2); + break; + + case UPDATE: + // Add 2 new CQ Listeners + CqListener newListener1 = new CqQueryTestListener(getCache().getLogger()); + CqListener newListener2 = new CqQueryTestListener(getCache().getLogger()); + cqAttrMutator.addCqListener(newListener1); + cqAttrMutator.addCqListener(newListener2); + + cqListeners = cqAttr.getCqListeners(); + assertEquals("CqListener count mismatch", cqListeners.length, 3); + break; + + case DESTROY: + cqListeners = cqAttr.getCqListeners(); + cqAttrMutator.removeCqListener(cqListeners[0]); + cqListeners = cqAttr.getCqListeners(); + assertEquals("CqListener count mismatch", cqListeners.length, 2); + + // Remove a listener and validate + cqAttrMutator.removeCqListener(cqListeners[0]); + cqListeners = cqAttr.getCqListeners(); + assertEquals("CqListener count mismatch", cqListeners.length, 1); + break; + } + } + }); + } + + + + + /** + * Test for InterestList and CQ registered from same clients. + * @throws Exception + */ + public void testInterestListAndCQs() throws Exception { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + /* Init Server and Client */ + createServer(server); + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testInterestListAndCQs"; + createPool(client, poolName, host0, thePort); + + createClient(client, thePort, host0); + + /* Create CQs. */ + createCQ(client, poolName, "testInterestListAndCQs_0", cqs[0]); + validateCQCount(client, 1); + + /* Init values at server. */ + final int size = 10; + + executeCQ(client, "testInterestListAndCQs_0", false, null); + registerInterestListCQ(client, regions[0], size, false); + + createValues(server, regions[0], size); + // Wait for client to Synch. + + for (int i=1; i <=10; i++){ + waitForCreated(client, "testInterestListAndCQs_0", KEY + i); + } + pause(5 * 1000); + + // validate CQs. + validateCQ(client, "testInterestListAndCQs_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ noTest, + /* deletes; */ noTest, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + + // Validate InterestList. + // CREATE + client.invoke(new CacheSerializableRunnable("validate updates") { + public void run2() throws CacheException { + final Region region = getRootRegion().getSubregion(regions[0]); + assertNotNull(region); + +// Set keys = region.entrySet(); +// assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", +// size, keys.size()); + // TODO does this WaitCriterion actually help? + WaitCriterion wc = new WaitCriterion() { + String excuse; + public boolean done() { + int sz = region.entrySet().size(); + if (sz == size) { + return true; + } + excuse = "Mismatch, number of keys (" + sz + + ") in local region is not equal to the interest list size (" + + size + ")"; + return false; + } + public String description() { + return excuse; + } + }; + DistributedTestCase.waitForCriterion(wc, 60 * 1000, 1000, true); + + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 1; i <= 10; i++) { + ctl.waitForCreated(KEY+i); + assertNotNull(region.getEntry(KEY+i)); + } + } + }); + + // UPDATE + createValues(server, regions[0], size); + // Wait for client to Synch. + for (int i=1; i <=10; i++){ + waitForUpdated(client, "testInterestListAndCQs_0", KEY + i); + } + + + client.invoke(new CacheSerializableRunnable("validate updates") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(regions[0]); + assertNotNull(region); + + Set keys = region.entrySet(); + assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", + size, keys.size()); + + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 1; i <= 10; i++) { + ctl.waitForUpdated(KEY+i); + assertNotNull(region.getEntry(KEY+i)); + } + } + }); + + // INVALIDATE + server.invoke(new CacheSerializableRunnable("Invalidate values") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regions[0]); + for (int i = 1; i <= size; i++) { + region1.invalidate(KEY+i); + } + } + }); + + + waitForInvalidated(client, "testInterestListAndCQs_0", KEY + 10); + + + client.invoke(new CacheSerializableRunnable("validate invalidates") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(regions[0]); + assertNotNull(region); + + Set keys = region.entrySet(); + assertEquals("Mismatch, number of keys in local region is not equal to the interest list size", + size, keys.size()); + + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 1; i <= 10; i++) { + ctl.waitForInvalidated(KEY+i); + assertNotNull(region.getEntry(KEY+i)); + } + } + }); + + validateCQ(client, "testInterestListAndCQs_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ size, + /* deletes; */ noTest, + /* queryInserts: */ size, + /* queryUpdates: */ size, + /* queryDeletes: */ size, + /* totalEvents: */ size * 3); + + // DESTROY - this should not have any effect on CQ, as the events are + // already destroyed from invalidate events. + server.invoke(new CacheSerializableRunnable("Invalidate values") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regions[0]); + for (int i = 1; i <= size; i++) { + region1.destroy(KEY+i); + } + } + }); + + // Wait for destroyed. + client.invoke(new CacheSerializableRunnable("validate destroys") { + public void run2() throws CacheException { + Region region = getRootRegion().getSubregion(regions[0]); + assertNotNull(region); + + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) region.getAttributes().getCacheListener(); + for (int i = 1; i <= 10; i++) { + ctl.waitForDestroyed(KEY+i); + } + } + }); + + validateCQ(client, "testInterestListAndCQs_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ size, + /* deletes; */ noTest, + /* queryInserts: */ size, + /* queryUpdates: */ size, + /* queryDeletes: */ size, + /* totalEvents: */ size * 3); + + closeClient(client); + closeServer(server); + } + + + /** + * Test for CQ register and UnRegister. + * @throws Exception + */ + public void testCQStopExecute() throws Exception { + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + /* Init Server and Client */ + createServer(server); + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCQStopExecute"; + createPool(client, poolName, host0, thePort); + + //createClient(client, thePort, host0); + + /* Create CQs. */ + createCQ(client, poolName, "testCQStopExecute_0", cqs[0]); + validateCQCount(client, 1); + + executeCQ(client, "testCQStopExecute_0", false, null); + + /* Init values at server. */ + int size = 10; + createValues(server, regions[0], size); + // Wait for client to Synch. + + waitForCreated(client, "testCQStopExecute_0", KEY+size); + + + // Check if Client and Server in sync. + //validateServerClientRegionEntries(server, client, regions[0]); + validateQuery(server, cqs[0], 10); + // validate CQs. + //validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest); + validateCQ(client, "testCQStopExecute_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + // Test CQ stop + stopCQ(client, "testCQStopExecute_0"); + + // Test CQ re-enable + executeCQ(client, "testCQStopExecute_0", false, null); + + /* Init values at server. */ + createValues(server, regions[0], 20); + // Wait for client to Synch. + waitForCreated(client, "testCQStopExecute_0", KEY+20); + size = 30; + + // Check if Client and Server in sync. + //validateServerClientRegionEntries(server, client, regions[0]); + validateQuery(server, cqs[0], 20); + // validate CQs. + //validateCQ(client, "testCQStopExecute_0", size, noTest, noTest, noTest); + validateCQ(client, "testCQStopExecute_0", + /* resultSize: */ noTest, + /* creates: */ 20, + /* updates: */ 10, + /* deletes; */ 0, + /* queryInserts: */ 20, + /* queryUpdates: */ 10, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + + // Stop and execute CQ 20 times + stopExecCQ(client, "testCQStopExecute_0", 20); + + // Test CQ Close + closeCQ(client, "testCQStopExecute_0"); + + // Close. + closeClient(client); + closeServer(server); + } + + /** + * Test for CQ Attributes Mutator functions + * @throws Exception + */ + public void testCQAttributesMutator() throws Exception { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + /* Init Server and Client */ + createServer(server); + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCQAttributesMutator"; + createPool(client, poolName, host0, thePort); + //createClient(client, thePort, host0); + + /* Create CQs. */ + String cqName = new String("testCQAttributesMutator_0"); + createCQ(client, poolName, cqName, cqs[0]); + validateCQCount(client, 1); + executeCQ(client,cqName, false, null); + + /* Init values at server. */ + int size = 10; + createValues(server, regions[0], size); + // Wait for client to Synch. + waitForCreated(client, cqName, KEY + size); + + // validate CQs. + validateCQ(client, cqName, + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + // Add 2 new CQ Listeners + mutateCQAttributes(client, cqName, UPDATE); + + /* Init values at server. */ + createValues(server, regions[0], size * 2); + waitForCreated(client, cqName, KEY + (size * 2)); + + validateCQ(client, cqName, + /* resultSize: */ noTest, + /* creates: */ 20, + /* updates: */ 10, + /* deletes; */ 0, + /* queryInserts: */ 20, + /* queryUpdates: */ 10, + /* queryDeletes: */ 0, + /* totalEvents: */ 30); + + // Remove 2 listeners and validate + mutateCQAttributes(client, cqName, DESTROY); + + validateCQ(client, cqName, + /* resultSize: */ noTest, + /* creates: */ 10, + /* updates: */ 10, + /* deletes; */ 0, + /* queryInserts: */ 10, + /* queryUpdates: */ 10, + /* queryDeletes: */ 0, + /* totalEvents: */ 20); + + // Reinitialize with 2 CQ Listeners + mutateCQAttributes(client, cqName, CREATE); + + /* Delete values at server. */ + deleteValues(server, regions[0], 20); + // Wait for client to Synch. + waitForDestroyed(client, cqName, KEY + (size * 2)); + + validateCQ(client, cqName, + /* resultSize: */ noTest, + /* creates: */ 0, + /* updates: */ 0, + /* deletes; */ 20, + /* queryInserts: */ 0, + /* queryUpdates: */ 0, + /* queryDeletes: */ 20, + /* totalEvents: */ 20); + + // Close CQ + closeCQ(client, cqName); + + // Close. + closeClient(client); + closeServer(server); + } + + /** + * Test for CQ register and UnRegister. + * @throws Exception + */ + public void testCQCreateClose() throws Exception { + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + /* Init Server and Client */ + createServer(server); + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCQCreateClose"; + System.out.println("##### Pool Name :" + poolName + " host :" + host0 + " port :" + thePort); + createPool(client, poolName, host0, thePort); + + // createClient(client, thePort, host0); + + /* debug */ + //getLogWriter().info("### DEBUG STOP ####"); + //pause(60 * 1000); + //getLogWriter().info("### DEBUG START ####"); + + /* Create CQs. */ + createCQ(client, poolName, "testCQCreateClose_0", cqs[0]); + validateCQCount(client, 1); + + executeCQ(client, "testCQCreateClose_0", false, null); + + /* Init values at server. */ + int size = 10; + createValues(server, regions[0], size); + // Wait for client to Synch. + waitForCreated(client, "testCQCreateClose_0", KEY+size); + + // Check if Client and Server in sync. + //validateServerClientRegionEntries(server, client, regions[0]); + validateQuery(server, cqs[0], 10); + // validate CQs. + validateCQ(client, "testCQCreateClose_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + // Test CQ stop + stopCQ(client, "testCQCreateClose_0"); + + // Test CQ re-enable + executeCQ(client, "testCQCreateClose_0", false, null); + + // Test CQ Close + closeCQ(client, "testCQCreateClose_0"); + + //Create CQs with no name, execute, and close. + // UNCOMMENT.... + createAndExecCQNoName(client, poolName, cqs[0]); + + // Accessing the closed CQ. + failIfCQExists(client, "testCQCreateClose_0"); + + // re-Create the cq which is closed. + createCQ(client, poolName, "testCQCreateClose_0", cqs[0]); + + /* Test CQ Count */ + validateCQCount(client, 1); + + // Registering CQ with same name from same client. + try { + createCQ(client, poolName, "testCQCreateClose_0", cqs[0]); + fail("Trying to create CQ with same name. Should have thrown CQExistsException"); + } catch (dunit.RMIException rmiExc) { + Throwable cause = rmiExc.getCause(); + assertTrue("unexpected cause: " + cause.getClass().getName(), cause instanceof AssertionError); + Throwable causeCause = cause.getCause(); // should be a CQExistsException + assertTrue("Got wrong exception: " + causeCause.getClass().getName(), + causeCause instanceof CqExistsException); + } + + // Getting values from non-existent CQ. + failIfCQExists(client, "testCQCreateClose_NO"); + + // Server Registering CQ. + try { + createCQ(server, "testCQCreateClose_1", cqs[0]); + fail("Trying to create CQ on Cache Server. Should have thrown Exception."); + } catch (dunit.RMIException rmiExc) { + Throwable cause = rmiExc.getCause(); + assertTrue("unexpected cause: " + cause.getClass().getName(), + cause instanceof AssertionError); + Throwable causeCause = cause.getCause(); // should be a IllegalStateException + assertTrue("Got wrong exception: " + causeCause.getClass().getName(), + causeCause instanceof IllegalStateException); + } + + validateCQCount(client, 1); + + createCQ(client, poolName, "testCQCreateClose_3", cqs[2]); + + validateCQCount(client, 2); + + /* Test for closeAllCQs() */ + + client.invoke(new CacheSerializableRunnable("CloseAll CQ :") { + public void run2() throws CacheException { + getLogWriter().info("### Close All CQ. ###"); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + getLogWriter().info("Failed to getCQService.", cqe); + fail("Failed to getCQService.", cqe); + } + + // Close CQ. + try { + cqService.closeCqs(); + } catch (Exception ex) { + getLogWriter().info("Failed to close All CQ.", ex); + fail("Failed to close All CQ. ", ex); + } + } + }); + + validateCQCount(client, 0); + + // Initialize. + createCQ(client, poolName, "testCQCreateClose_2", cqs[1]); + createCQ(client, poolName, "testCQCreateClose_4", cqs[1]); + createCQ(client, poolName, "testCQCreateClose_5", cqs[1]); + + // Execute few of the initialized cqs + executeCQ(client, "testCQCreateClose_4", false, null); + executeCQ(client, "testCQCreateClose_5", false, null); + + // Call close all CQ. + client.invoke(new CacheSerializableRunnable("CloseAll CQ 2 :") { + public void run2() throws CacheException { + getLogWriter().info("### Close All CQ 2. ###"); + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + + // Close CQ. + try { + cqService.closeCqs(); + } catch (Exception ex) { + fail("Failed to close All CQ . ", ex); + } + } + }); + + // Close. + closeClient(client); + closeServer(server); + } + + /** + * This will test the events after region destory. + * The CQs on the destroy region needs to be closed. + * + */ + public void testRegionDestroy() throws Exception { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + /* Init Server and Client */ + createServer(server); + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testRegionDestroy"; + createPool(client, poolName, host0, thePort); + + createClient(client, thePort, host0); + + /* Create CQs. */ + createCQ(client, poolName, "testRegionDestroy_0", cqs[0]); + createCQ(client, poolName, "testRegionDestroy_1", cqs[0]); + createCQ(client, poolName, "testRegionDestroy_2", cqs[0]); + + executeCQ(client, "testRegionDestroy_0", false, null); + executeCQ(client, "testRegionDestroy_1", false, null); + executeCQ(client, "testRegionDestroy_2", false, null); + + /* Init values at server. */ + final int size = 10; + registerInterestListCQ(client, regions[0], size, false); + createValues(server, regions[0], size); + + // Wait for client to Synch. + + waitForCreated(client, "testRegionDestroy_0", KEY + 10); + + + // validate CQs. + validateCQ(client, "testRegionDestroy_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ noTest, + /* deletes; */ noTest, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + // Validate InterestList. + // CREATE + client.invoke(new CacheSerializableRunnable("validate updates") { + public void run2() throws CacheException { + // Wait for the region to become the correct size + WaitCriterion wc = new WaitCriterion() { + String excuse; + public boolean done() { + Region region = getRootRegion().getSubregion(regions[0]); + if (region == null) { + excuse = "Can't find region"; + return false; + } + int sz = region.entrySet().size(); + if (sz != size) { + excuse = "Region is of size " + sz + ", expected " + size; + return false; + } + return true; + } + public String description() { + return excuse; + } + }; + DistributedTestCase.waitForCriterion(wc, 30 * 1000, 250, true); + + Region region = getRootRegion().getSubregion(regions[0]); + assertNotNull(region); + + CertifiableTestCacheListener ctl = (CertifiableTestCacheListener) + region.getAttributes().getCacheListener(); + for (int i = 1; i <= 10; i++) { + ctl.waitForCreated(KEY+i); + assertNotNull(region.getEntry(KEY+i)); + } + } + }); + + // Destroy Region. + server.invoke(new CacheSerializableRunnable("Destroy Region") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regions[0]); + region1.destroyRegion(); + } + }); + + pause(2 * 1000); + validateCQCount(client, 0); + + closeClient(client); + closeServer(server); + + } + + /** + * Test for CQ with multiple clients. + */ + public void testCQWithMultipleClients() throws Exception { + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client1 = host.getVM(1); + VM client2 = host.getVM(2); + VM client3 = host.getVM(3); + + /* Create Server and Client */ + createServer(server); + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName1 = "testCQWithMultipleClients1"; + String poolName2 = "testCQWithMultipleClients2"; + String poolName3 = "testCQWithMultipleClients3"; + + createPool(client1, poolName1, host0, thePort); + createPool(client2, poolName2, host0, thePort); + + /* Create CQs. and initialize the region */ + createCQ(client1, poolName1, "testCQWithMultipleClients_0", cqs[0]); + executeCQ(client1, "testCQWithMultipleClients_0", false, null); + + createCQ(client2, poolName2, "testCQWithMultipleClients_0", cqs[0]); + executeCQ(client2, "testCQWithMultipleClients_0", false, null); + + int size = 10; + + // Create Values on Server. + createValues(server, regions[0], size); + + waitForCreated(client1, "testCQWithMultipleClients_0", KEY + 10); + + /* Validate the CQs */ + validateCQ(client1, "testCQWithMultipleClients_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + waitForCreated(client2, "testCQWithMultipleClients_0", KEY + 10); + + validateCQ(client2, "testCQWithMultipleClients_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + + /* Close test */ + closeCQ(client1, "testCQWithMultipleClients_0"); + + validateCQ(client2, "testCQWithMultipleClients_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + /* Init new client and create cq */ + createPool(client3, poolName3, host0, thePort); + + createCQ(client3, poolName3, "testCQWithMultipleClients_0", cqs[0]); + createCQ(client3, poolName3, "testCQWithMultipleClients_1", cqs[1]); + executeCQ(client3, "testCQWithMultipleClients_0", false, null); + executeCQ(client3, "testCQWithMultipleClients_1", false, null); + + // Update values on Server. This will be updated on new Client CQs. + createValues(server, regions[0], size); + + + waitForUpdated(client3, "testCQWithMultipleClients_0", KEY + 10); + + + validateCQ(client3, "testCQWithMultipleClients_0", + /* resultSize: */ noTest, + /* creates: */ 0, + /* updates: */ size, + /* deletes; */ 0, + /* queryInserts: */ 0, + /* queryUpdates: */ size, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + validateCQ(client3, "testCQWithMultipleClients_1", + /* resultSize: */ noTest, + /* creates: */ 0, + /* updates: */ 1, + /* deletes; */ 0, + /* queryInserts: */ 0, + /* queryUpdates: */ 1, + /* queryDeletes: */ 0, + /* totalEvents: */ 1); + + /* Validate the CQ count */ + validateCQCount(client1, 0); + validateCQCount(client2, 1); + validateCQCount(client3, 2); + + /* Close Client Test */ + closeClient(client1); + + clearCQListenerEvents(client2, "testCQWithMultipleClients_0"); + clearCQListenerEvents(client3, "testCQWithMultipleClients_1"); + + // Update values on server, update again. + createValues(server, regions[0], size); + + + waitForUpdated(client2, "testCQWithMultipleClients_0", KEY + 10); + + + validateCQ(client2, "testCQWithMultipleClients_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ size * 2, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ size * 2, + /* queryDeletes: */ 0, + /* totalEvents: */ size * 3); + + waitForUpdated(client3, "testCQWithMultipleClients_1", KEY + 2); + + validateCQ(client3, "testCQWithMultipleClients_1", + /* resultSize: */ noTest, + /* creates: */ 0, + /* updates: */ 2, + /* deletes; */ 0, + /* queryInserts: */ 0, + /* queryUpdates: */ 2, + /* queryDeletes: */ 0, + /* totalEvents: */ 2); + + /* Close Server and Client */ + closeClient(client2); + closeClient(client3); + closeServer(server); + } + + /** + * Test for CQ ResultSet. + */ + public void testCQResultSet() throws Exception { + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + createServer(server); + + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCQResultSet"; + createPool(client, poolName, host0, thePort); + + // Create client. + // createClient(client, thePort, host0); + + /* CQ Test with initial Values. */ + int size = 10; + createValues(server, regions[0], size); + pause(1*500); + + // Create CQs. + createCQ(client, poolName, "testCQResultSet_0", cqs[0]); + + // Check resultSet Size. + executeCQ(client, "testCQResultSet_0", true, 10, null, null); + + /* CQ Test with no Values on Region */ + createCQ(client, poolName, "testCQResultSet_1", cqs[2]); + // Check resultSet Size. + executeCQ(client, "testCQResultSet_1", true, 0, null, null); + stopCQ(client, "testCQResultSet_1"); + + // Init values. + createValues(server, regions[1], 5); + validateQuery(server, cqs[2], 2); + + executeCQ(client, "testCQResultSet_1", true, 2, null, null); + + /* compare values... + Disabled since we don't currently maintain results on the client + + validateCQ(client, "testCQResultSet_1", 2, noTest, noTest, noTest); + Portfolio[] values = new Portfolio[] {new Portfolio(2), new Portfolio(4)}; + Hashtable t = new Hashtable(); + String[] keys = new String[] {"key-2", "key-4"}; + t.put(keys[0], values[0]); + t.put(keys[1], values[1]); + + compareValues(client, "testCQResultSet_1", t); + + deleteValues(server, regions[1], 3); + t.remove("key-4"); + pause(2 * 1000); + + try { + compareValues(client, "testCQResultSet_1", t); + fail("Should have thrown Exception. The value should not be present in cq results region"); + } + catch (Exception ex) { // @todo check for specific exception type + } + + */ + + // Close. + closeClient(client); + closeServer(server); + } + + /** + * Test for CQ Listener events. + * + */ + public void testCQEvents() throws Exception { + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + createServer(server); + + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCQEvents"; + createPool(client, poolName, host0, thePort); + + // Create client. + //createClient(client, thePort, host0); + + // Create CQs. + createCQ(client, poolName, "testCQEvents_0", cqs[0]); + + executeCQ(client, "testCQEvents_0", false, null); + + // Init values at server. + int size = 10; + createValues(server, regions[0], size); + + waitForCreated(client, "testCQEvents_0", KEY+size); + + // validate Create events. + validateCQ(client, "testCQEvents_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + // Update values. + createValues(server, regions[0], 5); + createValues(server, regions[0], 10); + + waitForUpdated(client, "testCQEvents_0", KEY+size); + + + // validate Update events. + validateCQ(client, "testCQEvents_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 15, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 15, + /* queryDeletes: */ 0, + /* totalEvents: */ size + 15); + + // Validate delete events. + deleteValues(server, regions[0], 5); + waitForDestroyed(client, "testCQEvents_0", KEY+5); + + validateCQ(client, "testCQEvents_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 15, + /* deletes; */5, + /* queryInserts: */ size, + /* queryUpdates: */ 15, + /* queryDeletes: */ 5, + /* totalEvents: */ size + 15 + 5); + + // Insert invalid Events. + server.invoke(new CacheSerializableRunnable("Create values") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regions[0]); + for (int i = -1; i >= -5; i--) { + //change : new Portfolio(i) rdubey ( for Suspect strings problem). + //region1.put(KEY+i, new Portfolio(i) ); + region1.put(KEY+i, KEY+i); + } + } + }); + + pause(1 * 1000); + // cqs should not get any creates, deletes or updates. rdubey. + validateCQ(client, "testCQEvents_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 15, + /* deletes; */5, + /* queryInserts: */ size, + /* queryUpdates: */ 15, + /* queryDeletes: */ 5, + /* totalEvents: */ size + 15 + 5); + + // Close. + closeClient(client); + closeServer(server); + } + + /** + * Test query execution multiple times on server without ALIAS. + * @throws Exception + */ + public void testCqEventsWithoutAlias() throws Exception { + + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + createServer(server); + + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCQEvents"; + createPool(client, poolName, host0, thePort); + + // Create client. + //createClient(client, thePort, host0); + + // Create CQs. + createCQ(client, poolName, "testCQEvents_0", cqs[11]); + + executeCQ(client, "testCQEvents_0", false, null); + + // Init values at server. + int size = 10; + createValues(server, regions[0], size); + + waitForCreated(client, "testCQEvents_0", KEY+size); + + // validate Create events. + validateCQ(client, "testCQEvents_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + // Update values. + createValues(server, regions[0], 5); + createValues(server, regions[0], 10); + + waitForUpdated(client, "testCQEvents_0", KEY+size); + + + // validate Update events. + validateCQ(client, "testCQEvents_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 15, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 15, + /* queryDeletes: */ 0, + /* totalEvents: */ size + 15); + + // Validate delete events. + deleteValues(server, regions[0], 5); + waitForDestroyed(client, "testCQEvents_0", KEY+5); + + validateCQ(client, "testCQEvents_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 15, + /* deletes; */5, + /* queryInserts: */ size, + /* queryUpdates: */ 15, + /* queryDeletes: */ 5, + /* totalEvents: */ size + 15 + 5); + + // Insert invalid Events. + server.invoke(new CacheSerializableRunnable("Create values") { + public void run2() throws CacheException { + Region region1 = getRootRegion().getSubregion(regions[0]); + for (int i = -1; i >= -5; i--) { + //change : new Portfolio(i) rdubey ( for Suspect strings problem). + //region1.put(KEY+i, new Portfolio(i) ); + region1.put(KEY+i, KEY+i); + } + } + }); + + pause(1 * 1000); + // cqs should not get any creates, deletes or updates. rdubey. + validateCQ(client, "testCQEvents_0", + /* resultSize: */ noTest, + /* creates: */ size, + /* updates: */ 15, + /* deletes; */5, + /* queryInserts: */ size, + /* queryUpdates: */ 15, + /* queryDeletes: */ 5, + /* totalEvents: */ size + 15 + 5); + + // Close. + closeClient(client); + closeServer(server); + } + /** + * Test for stopping and restarting CQs. + * @throws Exception + */ + public void testEnableDisableCQ() throws Exception { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + createServer(server); + + final int thePort = server.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testEnableDisableCQ"; + createPool(client, poolName, host0, thePort); + + // Create client. + //createClient(client, thePort, host0); + + // Create CQs. + createCQ(client, poolName, "testEnableDisable_0", cqs[0]); + executeCQ(client, "testEnableDisable_0", false, null); + + /* Test for disableCQ */ + client.invoke(new CacheSerializableRunnable("Client disableCQs()") { + public void run2() throws CacheException { + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + cqService.stopCqs(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + } + }); + + pause(1 * 1000); + // Init values at server. + int size = 10; + createValues(server, regions[0], size); + pause(1 * 500); + // There should not be any creates. + validateCQ(client, "testEnableDisable_0", + /* resultSize: */ noTest, + /* creates: */ 0, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ 0, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ 0); + + /* Test for enable CQ */ + client.invoke(new CacheSerializableRunnable("Client enableCQs()") { + public void run2() throws CacheException { + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + cqService.executeCqs(); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + } + }); + pause(1 * 1000); + createValues(server, regions[0], size); + waitForUpdated(client, "testEnableDisable_0", KEY+size); + // It gets created on the CQs + validateCQ(client, "testEnableDisable_0", + /* resultSize: */ noTest, + /* creates: */ 0, + /* updates: */ size, + /* deletes; */ 0, + /* queryInserts: */ 0, + /* queryUpdates: */ size, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + /* Test for disableCQ on Region*/ + client.invoke(new CacheSerializableRunnable("Client disableCQs()") { + public void run2() throws CacheException { + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + cqService.stopCqs("/root/" + regions[0]); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + } + }); + + pause(2 * 1000); + deleteValues(server, regions[0], size / 2); + pause(1 * 500); + // There should not be any deletes. + validateCQ(client, "testEnableDisable_0", + /* resultSize: */ noTest, + /* creates: */ 0, + /* updates: */ size, + /* deletes; */ 0, + /* queryInserts: */ 0, + /* queryUpdates: */ size, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + + /* Test for enable CQ on region */ + client.invoke(new CacheSerializableRunnable("Client enableCQs()") { + public void run2() throws CacheException { + // Get CQ Service. + QueryService cqService = null; + try { + cqService = getCache().getQueryService(); + cqService.executeCqs("/root/" + regions[0]); + } catch (Exception cqe) { + fail("Failed to getCQService.", cqe); + } + } + }); + pause(1 * 1000); + createValues(server, regions[0], size / 2); + waitForCreated(client, "testEnableDisable_0", KEY+(size / 2)); + // Gets updated on the CQ. + validateCQ(client, "testEnableDisable_0", + /* resultSize: */ noTest, + /* creates: */ size / 2, + /* updates: */ size, + /* deletes; */ 0, + /* queryInserts: */ size / 2, + /* queryUpdates: */ size, + /* queryDeletes: */ 0, + /* totalEvents: */ size * 3 / 2); + + // Close. + closeClient(client); + closeServer(server); + } + + /** + * Test for Complex queries. + * @throws Exception + */ + public void testQuery() throws Exception { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + createServer(server); + + final int thePort = server.invokeInt
<TRUNCATED>