http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolOptimizedExecuteDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolOptimizedExecuteDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolOptimizedExecuteDUnitTest.java new file mode 100644 index 0000000..c58e02c --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqQueryUsingPoolOptimizedExecuteDUnitTest.java @@ -0,0 +1,42 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.query.cq.dunit; + +import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl; + +import dunit.SerializableRunnable; + +/** + * Test class for testing {@link CqServiceImpl#EXECUTE_QUERY_DURING_INIT} flag + * + */ +public class CqQueryUsingPoolOptimizedExecuteDUnitTest extends CqQueryUsingPoolDUnitTest{ + + public CqQueryUsingPoolOptimizedExecuteDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false; + } + }); + } + + @Override + public void tearDown2() throws Exception { + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true; + } + }); + super.tearDown2(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolDUnitTest.java new file mode 100644 index 0000000..e7f3e0e --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolDUnitTest.java @@ -0,0 +1,1131 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.query.cq.dunit; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.data.Portfolio; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; +import com.gemstone.gemfire.cache.query.internal.cq.ServerCQImpl; +import com.gemstone.gemfire.cache.query.internal.cq.CqService; +import com.gemstone.gemfire.cache.query.internal.index.IndexManager; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.cache30.CacheTestCase; +import com.gemstone.gemfire.internal.AvailablePortHelper; + +import dunit.Host; +import dunit.SerializableRunnable; +import dunit.VM; + +/** + * This class tests the ContiunousQuery mechanism in GemFire. + * + * @author anil + */ +public class CqResultSetUsingPoolDUnitTest extends CacheTestCase { + + protected CqQueryUsingPoolDUnitTest cqDUnitTest = new CqQueryUsingPoolDUnitTest("CqResultSetUsingPoolDUnitTest"); + + private final String selStr = "SELECT * FROM /root/regionA"; + + /** Supported queries */ + public final String[] condition = new String [] { + /* 0 */ " p WHERE p.ID > 3", + /* 1 */ " p WHERE p.ID < 3", + /* 2 */ " WHERE ID = 3", + /* 3 */ " p WHERE p.ID >= 3", + /* 4 */ " p WHERE p.ID <= 3", + /* 5 */ " p WHERE p.ID > 3 AND p.status = 'active'", + /* 6 */ " WHERE status = 'active' AND ID < 3", + /* 7 */ " p WHERE p.names[0] = 'aaa'", + /* 8 */ " p WHERE p.status LIKE 'active'", + /* 9 */ " p WHERE p.collectionHolderMap.get('1').arr[0] = '0'", + /* 10 */ " p WHERE p.position1.portfolioId > 3", + /* 11 */ " p where p.position3[1].portfolioId = 2", + /* 12 */ " p where NOT(SELECT DISTINCT * FROM positions.values pos " + + " WHERE pos.secId in SET('YHOO', 'SUN', 'IBM', 'YHOO', 'GOOG', " + + " 'MSFT', 'AOL', 'APPL', 'ORCL', 'SAP', 'DELL', 'RHAT', 'NOVL', 'HP')).isEmpty", + /* 13 */ " p WHERE p.ID != 3 AND p.ID !=4", + /* 14 */ " p WHERE (p.ID = 2 OR p.ID = 4) AND p.status = 'active'", + + }; + + /** For intial size of 5 */ + public final int[] resultSize = new int [] { + /* 0 */ 2, + /* 1 */ 2, + /* 2 */ 1, + /* 3 */ 3, + /* 4 */ 3, + /* 5 */ 1, + /* 6 */ 1, + /* 7 */ 5, + /* 8 */ 2, + /* 9 */ 5, + /* 10 */ 2, + /* 11 */ 5, + /* 12 */ 5, + /* 13 */ 3, + /* 14 */ 2, + }; + + /** For intial size of 5 */ + public final String[][] expectedKeys = new String [][] { + /* 0 */ {"key-4", "key-5"}, + /* 1 */ {"key-1", "key-2"}, + /* 2 */ {"key-3"}, + /* 3 */ {"key-3", "key-4", "key-5"}, + /* 4 */ {"key-1", "key-2", "key-3"}, + /* 5 */ {"key-4"}, + /* 6 */ {"key-2"}, + /* 7 */ {"key-1", "key-2", "key-3","key-4", "key-5"}, + /* 8 */ {"key-2", "key-4"}, + /* 9 */ {"key-1", "key-2", "key-3","key-4", "key-5"}, + /* 10 */ {"key-4", "key-5"}, + /* 11 */ {"key-1", "key-2", "key-3","key-4", "key-5"}, + /* 12 */ {"key-1", "key-2", "key-3","key-4", "key-5"}, + /* 13 */ {"key-1", "key-2", "key-5"}, + /* 14 */ {"key-2", "key-4"}, + }; + + + + /** Unsupported queries */ + public final String[] condition2 = new String [] { + /* 0 */ " p1, /root/regionB p2 WHERE p1.status = p2.status", + /* 1 */ " p, p.positions.values p1 WHERE p1.secId = 'IBM'", + /* 2 */ " p, p.positions.values AS pos WHERE pos.secId != '1'", + /* 3 */ " p WHERE p.ID in (SELECT p1.ID FROM /root/regionA p1 WHERE p1.ID > 3)", + /* 4 */ ".entries entry WHERE entry.key = '1'", + /* 5 */ ".entries entry WHERE entry.value.ID > '3'", + /* 6 */ ".values p WHERE p.ID > '3' and p.status = 'active'", + /* 7 */ " p, p.position3 pos where pos.portfolioId = 1", + }; + + + public CqResultSetUsingPoolDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + + // avoid IllegalStateException from HandShake by connecting all vms tor + // system before creating ConnectionPools + getSystem(); + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + getSystem(); + } + }); + + } + + + /** + * Tests CQ Result Set. + * + * @throws Exception + */ + public void testCqResults() throws Exception + { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + cqDUnitTest.createServer(server); + + final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCqResults"; + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // Put 5 entries into the region. + cqDUnitTest.createValues(server, "regionA", 5); + + // Test for supported queries. + String cqQuery = ""; + for (int queryCnt=0; queryCnt < condition.length; queryCnt++) { + cqQuery = selStr + condition[queryCnt]; + cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery); + cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt], expectedKeys[queryCnt], null); + } + + // Test unsupported queries. + for (int queryCnt=0; queryCnt < condition2.length; queryCnt++) { + cqQuery = selStr + condition2[queryCnt]; + + try { + cqDUnitTest.createCQ(client, poolName, "testCqResultsF_" + queryCnt, cqQuery); + //cqDUnitTest.executeCQ(client, "testCqResultsF_" + queryCnt, true, cqDUnitTest.noTest, null); + fail("UnSupported CQ Query, Expected to fail. " + + " CQ :" + "testCqResultsF_" + queryCnt + + " Query : " + cqQuery); + } catch (Exception ex) { + // Expected. + } + } + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server); + } + + + /** + * Tests CQ Result Set with Compact Range Index. + * + * @throws Exception + */ + public void testCqResultsWithCompactRangeIndex() throws Exception + { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + cqDUnitTest.createServer(server); + + final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCqResults"; + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // Create index. + cqDUnitTest.createFunctionalIndex(server, "IdIndex", "p.ID", "/root/regionA p"); + cqDUnitTest.createFunctionalIndex(server, "statusIndex", "p.status", "/root/regionA p"); + cqDUnitTest.createFunctionalIndex(server, "portfolioIdIndex", "p.position1.portfolioId", "/root/regionA p"); + + // Put 5 entries into the region. + cqDUnitTest.createValues(server, "regionA", 5); + + // Test for supported queries. + String cqQuery = ""; + for (int queryCnt=0; queryCnt < condition.length; queryCnt++) { + cqQuery = selStr + condition[queryCnt]; + cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery); + cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt], expectedKeys[queryCnt], null); + } + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server); + } + + /** + * Tests CQ Result Set with Range Index. + * + * @throws Exception + */ + public void testCqResultsWithRangeIndex() throws Exception + { + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + cqDUnitTest.createServer(server); + + final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCqResults"; + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // Create index. + server.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") { + public void run2() throws CacheException { + IndexManager.TEST_RANGEINDEX_ONLY = true; + } + }); + + cqDUnitTest.createFunctionalIndex(server, "IdIndex", "p.ID", "/root/regionA p"); + cqDUnitTest.createFunctionalIndex(server, "statusIndex", "p.status", "/root/regionA p"); + cqDUnitTest.createFunctionalIndex(server, "portfolioIdIndex", "p.position1.portfolioId", "/root/regionA p"); + + // Put 5 entries into the region. + cqDUnitTest.createValues(server, "regionA", 5); + + // Test for supported queries. + String cqQuery = ""; + for (int queryCnt=0; queryCnt < condition.length; queryCnt++) { + cqQuery = selStr + condition[queryCnt]; + cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery); + cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt], expectedKeys[queryCnt], null); + } + + // Unset the flag. + server.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") { + public void run2() throws CacheException { + IndexManager.TEST_RANGEINDEX_ONLY = false; + } + }); + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server); + } + + /** + * Tests CQ Result Set. + * + * @throws Exception + */ + public void testCqResultsOnPR() throws Exception + { + final Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client = host.getVM(2); + + cqDUnitTest.createServerWithPR(server1, 0, false, 0); + cqDUnitTest.createServerWithPR(server2, 0, false, 0); + + final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server1.getHost()); + + String poolName = "testCqResults"; + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // Put 5 entries into the region. + cqDUnitTest.createValues(server1, "regionA", 5); + + // Test for supported queries. + String cqQuery = ""; + for (int queryCnt=0; queryCnt < condition.length; queryCnt++) { + cqQuery = selStr + condition[queryCnt]; + cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery); + cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt], expectedKeys[queryCnt], null); + } + + // Test unsupported queries. + for (int queryCnt=0; queryCnt < condition2.length; queryCnt++) { + cqQuery = selStr + condition2[queryCnt]; + + try { + cqDUnitTest.createCQ(client, poolName, "testCqResultsF_" + queryCnt, cqQuery); + //cqDUnitTest.executeCQ(client, "testCqResultsF_" + queryCnt, true, cqDUnitTest.noTest, null); + fail("UnSupported CQ Query, Expected to fail. " + + " CQ :" + "testCqResultsF_" + queryCnt + + " Query : " + cqQuery); + } catch (Exception ex) { + // Expected. + } + } + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server1); + cqDUnitTest.closeServer(server2); + } + + /** + * Tests CQ Result Set with Compact Range Index. + * + * @throws Exception + */ + public void testCqResultsWithCompactRangeIndexOnPR() throws Exception + { + final Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client = host.getVM(2); + + cqDUnitTest.createServerWithPR(server1, 0, false, 0); + cqDUnitTest.createServerWithPR(server2, 0, false, 0); + + final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server1.getHost()); + + String poolName = "testCqResults"; + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // Create index. + cqDUnitTest.createFunctionalIndex(server1, "IdIndex", "p.ID", "/root/regionA p"); + cqDUnitTest.createFunctionalIndex(server1, "statusIndex", "p.status", "/root/regionA p"); + cqDUnitTest.createFunctionalIndex(server1, "portfolioIdIndex", "p.position1.portfolioId", "/root/regionA p"); + + // Put 5 entries into the region. + cqDUnitTest.createValues(server1, "regionA", 5); + + // Test for supported queries. + String cqQuery = ""; + for (int queryCnt=0; queryCnt < condition.length; queryCnt++) { + cqQuery = selStr + condition[queryCnt]; + cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery); + cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt], expectedKeys[queryCnt], null); + } + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server1); + cqDUnitTest.closeServer(server2); + + } + + /** + * Tests CQ Result Set with Range Index. + * + * @throws Exception + */ + public void testCqResultsWithRangeIndexOnPR() throws Exception + { + + final Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client = host.getVM(2); + + cqDUnitTest.createServerWithPR(server1, 0, false, 0); + cqDUnitTest.createServerWithPR(server2, 0, false, 0); + + final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server1.getHost()); + + String poolName = "testCqResults"; + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // Create index. + server1.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") { + public void run2() throws CacheException { + IndexManager.TEST_RANGEINDEX_ONLY = true; + } + }); + + server2.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") { + public void run2() throws CacheException { + IndexManager.TEST_RANGEINDEX_ONLY = true; + } + }); + + cqDUnitTest.createFunctionalIndex(server1, "IdIndex", "p.ID", "/root/regionA p"); + cqDUnitTest.createFunctionalIndex(server1, "statusIndex", "p.status", "/root/regionA p"); + cqDUnitTest.createFunctionalIndex(server1, "portfolioIdIndex", "p.position1.portfolioId", "/root/regionA p"); + + // Put 5 entries into the region. + cqDUnitTest.createValues(server1, "regionA", 5); + + // Test for supported queries. + String cqQuery = ""; + for (int queryCnt=0; queryCnt < condition.length; queryCnt++) { + cqQuery = selStr + condition[queryCnt]; + cqDUnitTest.createCQ(client, poolName, "testCqResultsP_" + queryCnt, cqQuery); + cqDUnitTest.executeCQ(client, "testCqResultsP_" + queryCnt, true, resultSize[queryCnt], expectedKeys[queryCnt], null); + } + + // Create index. + server1.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") { + public void run2() throws CacheException { + IndexManager.TEST_RANGEINDEX_ONLY = false; + } + }); + + server2.invoke(new CacheSerializableRunnable("Set RangeIndex Falg") { + public void run2() throws CacheException { + IndexManager.TEST_RANGEINDEX_ONLY = false; + } + }); + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server1); + cqDUnitTest.closeServer(server2); + } + + /** + * Tests CQ Result Set. + * + * @throws Exception + */ + public void testCqResultsCaching() throws Exception + { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + cqDUnitTest.createServer(server); + + final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCqResults"; + final String cqName = "testCqResultsP_0"; + + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // create CQ. + cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]); + + final int numObjects = 300; + final int totalObjects = 500; + + // initialize Region. + server.invoke(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + for (int i = 1; i <= numObjects; i++) { + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + } + }); + + // Keep updating region (async invocation). + server.invokeAsync(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + // Update (totalObjects - 1) entries. + for (int i = 1; i < totalObjects; i++) { + // Destroy entries. + if (i > 25 && i < 201) { + region.destroy(""+i); + continue; + } + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + // recreate destroyed entries. + for (int j = 26; j < 201; j++) { + Portfolio p = new Portfolio(j); + region.put(""+j, p); + } + // Add the last key. + Portfolio p = new Portfolio(totalObjects); + region.put(""+totalObjects, p); + } + }); + + // Execute CQ. + // While region operation is in progress execute CQ. + cqDUnitTest.executeCQ(client, cqName, true, null); + + // Verify CQ Cache results. + server.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqService cqService = null; + try { + cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqService.", ex); + fail ("Failed to get the internal CqService.", ex); + } + + // Wait till all the region update is performed. + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + while(true){ + if (region.get(""+ totalObjects) == null){ + try { + Thread.sleep(50); + } catch (Exception ex){ + //ignore. + } + continue; + } + break; + } + Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + if (cqQuery.getName().equals(cqName)) { + int size = cqQuery.getCqResultKeysSize(); + if (size != totalObjects) { + getLogWriter().info("The number of Cached events " + size + + " is not equal to the expected size " + totalObjects); + HashSet expectedKeys = new HashSet(); + for (int i = 1; i < totalObjects; i++) { + expectedKeys.add("" + i); + } + Set cachedKeys = cqQuery.getCqResultKeyCache(); + expectedKeys.removeAll(cachedKeys); + getLogWriter().info("Missing keys from the Cache : " + expectedKeys); + } + assertEquals("The number of keys cached for cq " + cqName + " is wrong.", + totalObjects, cqQuery.getCqResultKeysSize()); + } + } + } + }); + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server); + } + + /** + * Tests CQ Result Set. + * + * @throws Exception + */ + public void testCqResultsCachingForMultipleCQs() throws Exception + { + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client1 = host.getVM(1); + VM client2 = host.getVM(2); + + cqDUnitTest.createServer(server); + + final int port = server.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + + String poolName = "testCqResults"; + final String cqName1 = "testCqResultsP_0"; + final String cqName2 = "testCqResultsP_1"; + + cqDUnitTest.createPool(client1, poolName, host0, port); + cqDUnitTest.createPool(client2, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client1, port, host0); + cqDUnitTest.createClient(client2, port, host0); + + // create CQ. + cqDUnitTest.createCQ(client1, poolName, cqName1, cqDUnitTest.cqs[0]); + cqDUnitTest.createCQ(client2, poolName, cqName2, cqDUnitTest.cqs[0]); + + final int numObjects = 300; + final int totalObjects = 500; + + // initialize Region. + server.invoke(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + for (int i = 1; i <= numObjects; i++) { + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + } + }); + + // Keep updating region (async invocation). + server.invokeAsync(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + // Update (totalObjects - 1) entries. + for (int i = 1; i < totalObjects; i++) { + // Destroy entries. + if (i > 25 && i < 201) { + region.destroy(""+i); + continue; + } + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + // recreate destroyed entries. + for (int j = 26; j < 201; j++) { + Portfolio p = new Portfolio(j); + region.put(""+j, p); + } + // Add the last key. + Portfolio p = new Portfolio(totalObjects); + region.put(""+totalObjects, p); + } + }); + + // Execute CQ. + // While region operation is in progress execute CQ. + cqDUnitTest.executeCQ(client1, cqName1, true, null); + cqDUnitTest.executeCQ(client2, cqName2, true, null); + + // Verify CQ Cache results. + server.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqService cqService = null; + try { + cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqService.", ex); + fail ("Failed to get the internal CqService.", ex); + } + + // Wait till all the region update is performed. + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + while(true){ + if (region.get(""+ totalObjects) == null){ + try { + Thread.sleep(50); + } catch (Exception ex){ + //ignore. + } + continue; + } + break; + } + Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + int size = cqQuery.getCqResultKeysSize(); + if (size != totalObjects) { + getLogWriter().info("The number of Cached events " + size + + " is not equal to the expected size " + totalObjects); + HashSet expectedKeys = new HashSet(); + for (int i = 1; i < totalObjects; i++) { + expectedKeys.add("" + i); + } + Set cachedKeys = cqQuery.getCqResultKeyCache(); + expectedKeys.removeAll(cachedKeys); + getLogWriter().info("Missing keys from the Cache : " + expectedKeys); + } + assertEquals("The number of keys cached for cq " + cqQuery.getName() + " is wrong.", + totalObjects, cqQuery.getCqResultKeysSize()); + } + } + }); + + // Close. + cqDUnitTest.closeClient(client1); + cqDUnitTest.closeClient(client2); + cqDUnitTest.closeServer(server); + } + + /** + * Tests CQ Result Set. + * + * @throws Exception + */ + public void testCqResultsCachingForPR() throws Exception + { + final Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client = host.getVM(2); + + cqDUnitTest.createServerWithPR(server1, 0, false, 0); + cqDUnitTest.createServerWithPR(server2, 0, false, 0); + + final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server1.getHost()); + + String poolName = "testCqResults"; + final String cqName = "testCqResultsP_0"; + + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // create CQ. + cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]); + + final int numObjects = 300; + final int totalObjects = 500; + + // initialize Region. + server1.invoke(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + for (int i = 1; i <= numObjects; i++) { + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + } + }); + + // Keep updating region (async invocation). + server2.invokeAsync(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + // Update (totalObjects - 1) entries. + for (int i = 1; i < totalObjects; i++) { + // Destroy entries. + if (i > 25 && i < 201) { + region.destroy(""+i); + continue; + } + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + // recreate destroyed entries. + for (int j = 26; j < 201; j++) { + Portfolio p = new Portfolio(j); + region.put(""+j, p); + } + // Add the last key. + Portfolio p = new Portfolio(totalObjects); + region.put(""+totalObjects, p); + } + }); + + // Execute CQ. + // While region operation is in progress execute CQ. + cqDUnitTest.executeCQ(client, cqName, true, null); + + // Verify CQ Cache results. + server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqService cqService = null; + try { + cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqService.", ex); + fail ("Failed to get the internal CqService.", ex); + } + + // Wait till all the region update is performed. + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + while(true){ + if (region.get(""+ totalObjects) == null){ + try { + Thread.sleep(50); + } catch (Exception ex){ + //ignore. + } + continue; + } + break; + } + Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + if (cqQuery.getCqResultKeysSize() <= 0) { + fail("The Result Cache for CQ on PR is not working. CQ :" + cqName); + } + } + } + }); + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server1); + cqDUnitTest.closeServer(server2); + } + + /** + * Tests CQ Result Set caching for destroy events. + * + * @throws Exception + */ + public void testCqResultsCachingForDestroyEventsOnPR() throws Exception + { + final Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client = host.getVM(2); + + cqDUnitTest.createServerWithPR(server1, 0, false, 0); + cqDUnitTest.createServerWithPR(server2, 0, false, 0); + + final int port = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, + "getCacheServerPort"); + final String host0 = getServerHostName(server1.getHost()); + + String poolName = "testCqResults"; + final String cqName = "testCqResultsCachingForDestroyEventsOnPR_0"; + + cqDUnitTest.createPool(client, poolName, host0, port); + + // Create client. + cqDUnitTest.createClient(client, port, host0); + + // create CQ. + cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]); + + // Execute CQ. + cqDUnitTest.executeCQ(client, cqName, true, null); + + final int numObjects = 50; + + // initialize Region. + server1.invoke(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + for (int i = 1; i <= numObjects; i++) { + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + } + }); + + // Update from server2. + server2.invoke(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + for (int i = 1; i <= numObjects; i++) { + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + } + }); + + // Destroy entries. + server2.invoke(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + for (int i = 1; i <= numObjects; i++) { + Portfolio p = new Portfolio(i); + region.destroy(""+i); + } + } + }); + + // Wait for events to be sent. + cqDUnitTest.waitForDestroyed(client, cqName, "" + numObjects); + + // Verify CQ Cache results. + server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqService cqService = null; + try { + cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqService.", ex); + fail ("Failed to get the internal CqService.", ex); + } + + Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + if (cqQuery.getCqResultKeysSize() > 0) { + fail("The CQ Result Cache on PR should have been empty for CQ :" + cqName + " keys=" + cqQuery.getCqResultKeyCache()); + } + } + } + }); + + server2.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqService cqService = null; + try { + cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqService.", ex); + fail ("Failed to get the internal CqService.", ex); + } + + Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + if (cqQuery.getCqResultKeysSize() > 0) { + fail("The CQ Result Cache on PR should have been empty for CQ :" + cqName); + } + } + } + }); + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server1); + cqDUnitTest.closeServer(server2); + } + + + /** + * Tests CQ Result Caching with CQ Failover. + * + * @throws Exception + */ + public void testCqResultsCachingWithFailOver() throws Exception + { + final Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client = host.getVM(2); + + cqDUnitTest.createServer(server1); + + final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server1.getHost()); + final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1); + + String poolName = "testCQFailOver"; + final String cqName = "testCQFailOver_0"; + + cqDUnitTest.createPool(client, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]}); + + // create CQ. + cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]); + + final int numObjects = 300; + final int totalObjects = 500; + + // initialize Region. + server1.invoke(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + for (int i = 1; i <= numObjects; i++) { + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + } + }); + + // Keep updating region (async invocation). + server1.invokeAsync(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + // Update (totalObjects - 1) entries. + for (int i = 1; i < totalObjects; i++) { + // Destroy entries. + if (i > 25 && i < 201) { + region.destroy(""+i); + continue; + } + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + // recreate destroyed entries. + for (int j = 26; j < 201; j++) { + Portfolio p = new Portfolio(j); + region.put(""+j, p); + } + // Add the last key. + Portfolio p = new Portfolio(totalObjects); + region.put(""+totalObjects, p); + } + }); + + // Execute CQ. + // While region operation is in progress execute CQ. + cqDUnitTest.executeCQ(client, cqName, true, null); + + // Verify CQ Cache results. + server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqService cqService = null; + try { + cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqService.", ex); + fail ("Failed to get the internal CqService.", ex); + } + + // Wait till all the region update is performed. + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + while(true){ + if (region.get(""+ totalObjects) == null){ + try { + Thread.sleep(50); + } catch (Exception ex){ + //ignore. + } + continue; + } + break; + } + Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + if (cqQuery.getName().equals(cqName)) { + int size = cqQuery.getCqResultKeysSize(); + if (size != totalObjects) { + getLogWriter().info("The number of Cached events " + size + + " is not equal to the expected size " + totalObjects); + HashSet expectedKeys = new HashSet(); + for (int i = 1; i < totalObjects; i++) { + expectedKeys.add("" + i); + } + Set cachedKeys = cqQuery.getCqResultKeyCache(); + expectedKeys.removeAll(cachedKeys); + getLogWriter().info("Missing keys from the Cache : " + expectedKeys); + } + assertEquals("The number of keys cached for cq " + cqName + " is wrong.", + totalObjects, cqQuery.getCqResultKeysSize()); + } + } + } + }); + + cqDUnitTest.createServer(server2, ports[0]); + final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + System.out.println("### Port on which server1 running : " + port1 + + " Server2 running : " + thePort2); + pause(3 * 1000); + + // Close server1 for CQ fail over to server2. + cqDUnitTest.closeServer(server1); + pause(3 * 1000); + + // Verify CQ Cache results. + server2.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqService cqService = null; + try { + cqService = ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqService.", ex); + fail ("Failed to get the internal CqService.", ex); + } + + // Wait till all the region update is performed. + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + while(true){ + if (region.get(""+ totalObjects) == null){ + try { + Thread.sleep(50); + } catch (Exception ex){ + //ignore. + } + continue; + } + break; + } + Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + if (cqQuery.getName().equals(cqName)) { + int size = cqQuery.getCqResultKeysSize(); + if (size != totalObjects) { + getLogWriter().info("The number of Cached events " + size + + " is not equal to the expected size " + totalObjects); + HashSet expectedKeys = new HashSet(); + for (int i = 1; i < totalObjects; i++) { + expectedKeys.add("" + i); + } + Set cachedKeys = cqQuery.getCqResultKeyCache(); + expectedKeys.removeAll(cachedKeys); + getLogWriter().info("Missing keys from the Cache : " + expectedKeys); + } + assertEquals("The number of keys cached for cq " + cqName + " is wrong.", + totalObjects, cqQuery.getCqResultKeysSize()); + } + } + } + }); + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server2); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolOptimizedExecuteDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolOptimizedExecuteDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolOptimizedExecuteDUnitTest.java new file mode 100644 index 0000000..6d465f0 --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqResultSetUsingPoolOptimizedExecuteDUnitTest.java @@ -0,0 +1,223 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.query.cq.dunit; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.data.Portfolio; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; +import com.gemstone.gemfire.cache.query.internal.cq.ServerCQImpl; +import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.internal.AvailablePortHelper; + +import dunit.Host; +import dunit.SerializableRunnable; +import dunit.VM; + +public class CqResultSetUsingPoolOptimizedExecuteDUnitTest extends CqResultSetUsingPoolDUnitTest{ + + public CqResultSetUsingPoolOptimizedExecuteDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false; + } + }); + } + + @Override + public void tearDown2() throws Exception { + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true; + } + }); + super.tearDown2(); + } + + /** + * Tests CQ Result Caching with CQ Failover. + * When EXECUTE_QUERY_DURING_INIT is false and new server calls + * execute during HA the results cache is not initialized. + * @throws Exception + */ + @Override + public void testCqResultsCachingWithFailOver() throws Exception + { + final Host host = Host.getHost(0); + VM server1 = host.getVM(0); + VM server2 = host.getVM(1); + VM client = host.getVM(2); + + cqDUnitTest.createServer(server1); + + final int port1 = server1.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server1.getHost()); + final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(1); + + String poolName = "testCQFailOver"; + final String cqName = "testCQFailOver_0"; + + cqDUnitTest.createPool(client, poolName, new String[] {host0, host0}, new int[] {port1, ports[0]}); + + // create CQ. + cqDUnitTest.createCQ(client, poolName, cqName, cqDUnitTest.cqs[0]); + + final int numObjects = 300; + final int totalObjects = 500; + + // initialize Region. + server1.invoke(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + for (int i = 1; i <= numObjects; i++) { + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + } + }); + + // Keep updating region (async invocation). + server1.invokeAsync(new CacheSerializableRunnable("Update Region"){ + public void run2()throws CacheException { + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + // Update (totalObjects - 1) entries. + for (int i = 1; i < totalObjects; i++) { + // Destroy entries. + if (i > 25 && i < 201) { + region.destroy(""+i); + continue; + } + Portfolio p = new Portfolio(i); + region.put(""+i, p); + } + // recreate destroyed entries. + for (int j = 26; j < 201; j++) { + Portfolio p = new Portfolio(j); + region.put(""+j, p); + } + // Add the last key. + Portfolio p = new Portfolio(totalObjects); + region.put(""+totalObjects, p); + } + }); + + // Execute CQ. + // While region operation is in progress execute CQ. + cqDUnitTest.executeCQ(client, cqName, true, null); + + // Verify CQ Cache results. + server1.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqServiceImpl CqServiceImpl = null; + try { + CqServiceImpl = (com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl) ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqServiceImpl.", ex); + fail ("Failed to get the internal CqServiceImpl.", ex); + } + + // Wait till all the region update is performed. + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + while(true){ + if (region.get(""+ totalObjects) == null){ + try { + Thread.sleep(50); + } catch (Exception ex){ + //ignore. + } + continue; + } + break; + } + Collection<? extends InternalCqQuery> cqs = CqServiceImpl.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + if (cqQuery.getName().equals(cqName)) { + int size = cqQuery.getCqResultKeysSize(); + if (size != totalObjects) { + getLogWriter().info("The number of Cached events " + size + + " is not equal to the expected size " + totalObjects); + HashSet expectedKeys = new HashSet(); + for (int i = 1; i < totalObjects; i++) { + expectedKeys.add("" + i); + } + Set cachedKeys = cqQuery.getCqResultKeyCache(); + expectedKeys.removeAll(cachedKeys); + getLogWriter().info("Missing keys from the Cache : " + expectedKeys); + } + assertEquals("The number of keys cached for cq " + cqName + " is wrong.", + totalObjects, cqQuery.getCqResultKeysSize()); + } + } + } + }); + + cqDUnitTest.createServer(server2, ports[0]); + final int thePort2 = server2.invokeInt(CqQueryUsingPoolDUnitTest.class, "getCacheServerPort"); + System.out.println("### Port on which server1 running : " + port1 + + " Server2 running : " + thePort2); + pause(3 * 1000); + + // Close server1 for CQ fail over to server2. + cqDUnitTest.closeServer(server1); + pause(3 * 1000); + + // Verify CQ Cache results. + server2.invoke(new CacheSerializableRunnable("Verify CQ Cache results"){ + public void run2()throws CacheException { + CqServiceImpl CqServiceImpl = null; + try { + CqServiceImpl = (CqServiceImpl) ((DefaultQueryService)getCache().getQueryService()).getCqService(); + } catch (Exception ex) { + getLogWriter().info("Failed to get the internal CqServiceImpl.", ex); + fail ("Failed to get the internal CqServiceImpl.", ex); + } + + // Wait till all the region update is performed. + Region region = getCache().getRegion("/root/" + cqDUnitTest.regions[0]); + while(true){ + if (region.get(""+ totalObjects) == null){ + try { + Thread.sleep(50); + } catch (Exception ex){ + //ignore. + } + continue; + } + break; + } + Collection<? extends InternalCqQuery> cqs = CqServiceImpl.getAllCqs(); + for (InternalCqQuery cq: cqs){ + ServerCQImpl cqQuery = (ServerCQImpl)cq; + if (cqQuery.getName().equals(cqName)) { + int size = cqQuery.getCqResultKeysSize(); + assertEquals("The number of keys cached for cq " + cqName + " is wrong.", + 0, size); + } + } + } + }); + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server2); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java new file mode 100644 index 0000000..29ed0a4 --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStateDUnitTest.java @@ -0,0 +1,119 @@ +package com.gemstone.gemfire.cache.query.cq.dunit; + +import java.util.Properties; + +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.dunit.HelperTestCase; +import com.gemstone.gemfire.internal.AvailablePortHelper; + +import dunit.AsyncInvocation; +import dunit.DistributedTestCase; +import dunit.Host; +import dunit.SerializableCallable; +import dunit.VM; + +public class CqStateDUnitTest extends HelperTestCase { + + + public CqStateDUnitTest(String name) { + super(name); + } + + public void testNothingBecauseBug51953() { + // remove when bug #51953 is fixed + } + + // this test is disabled due to a 25% failure rate in + // CI testing. See internal ticket #52229 + public void disabledtestBug51222() throws Exception { + //The client can log this when the server shuts down. + addExpectedException("Could not find any server"); + addExpectedException("java.net.ConnectException"); + final String cqName = "theCqInQuestion"; + final String regionName = "aattbbss"; + final Host host = Host.getHost(0); + VM serverA = host.getVM(1); + VM serverB = host.getVM(2); + VM client = host.getVM(3); + + final int[] ports = AvailablePortHelper.getRandomAvailableTCPPorts(2); + startCacheServer(serverA, ports[0], getAuthenticatedServerProperties()); + createReplicatedRegion(serverA, regionName, null); + + final String host0 = getServerHostName(serverA.getHost()); + startClient(client, new VM[]{ serverA, serverB }, ports, 1, getClientProperties()); + createCQ(client, cqName, "select * from /"+ regionName, null); + + //create the cacheserver but regions must be present first or else cq execute will fail with no region found + createCacheServer(serverB, ports[1], getServerProperties(0)); + createReplicatedRegion(serverB, regionName, null); + startCacheServers(serverB); + + AsyncInvocation async = executeCQ(client, cqName); + DistributedTestCase.join(async, 10000, getLogWriter()); + + Boolean clientRunning = (Boolean) client.invoke(new SerializableCallable() { + @Override + public Object call() throws Exception { + final CqQuery cq = getCache().getQueryService().getCq(cqName); + waitForCriterion(new WaitCriterion() { + @Override + public boolean done() { + return cq.getState().isRunning(); + } + @Override + public String description() { + return "waiting for Cq to be in a running state: " + cq; + } + }, 30000, 1000, false); + return cq.getState().isRunning(); + } + }); + assertTrue("Client was not running", clientRunning); + + //hope that server 2 comes up before num retries is exhausted by the execute cq command + //hope that the redundancy satisfier sends message and is executed after execute cq has been executed + //This is the only way bug 51222 would be noticed + //verify that the cq on the server is still in RUNNING state; + Boolean isRunning = (Boolean) serverB.invoke(new SerializableCallable() { + @Override + public Object call() throws Exception { + CqQuery cq = getCache().getQueryService().getCqs()[0]; + return cq.getState().isRunning(); + } + + }); + + assertTrue("Cq was not running on server" , isRunning); + } + + public Properties getAuthenticatedServerProperties() { + Properties props = new Properties(); + props.put("mcast-port", "0"); + props.put("security-client-accessor", + "com.gemstone.gemfire.cache.query.dunit.CloseCacheAuthorization.create"); + props.put("security-client-accessor-pp", + "com.gemstone.gemfire.cache.query.dunit.CloseCacheAuthorization.create"); + props.put("security-client-authenticator", + "templates.security.DummyAuthenticator.create"); + return props; + } + + public Properties getServerProperties() { + Properties props = new Properties(); + props.put("mcast-port", "0"); + return props; + } + + public Properties getClientProperties() { + Properties props = new Properties(); + props.put("security-client-auth-init", + "templates.security.UserPasswordAuthInit.create"); + + props.put("security-username", "root"); + props.put("security-password", "root"); + return props; + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsDUnitTest.java new file mode 100644 index 0000000..b66228e --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsDUnitTest.java @@ -0,0 +1,433 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.query.cq.dunit; + +import java.util.Collection; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.query.CqException; +import com.gemstone.gemfire.cache.query.CqQuery; +import com.gemstone.gemfire.cache.query.CqServiceStatistics; +import com.gemstone.gemfire.cache.query.CqStatistics; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.internal.CqQueryVsdStats; +import com.gemstone.gemfire.cache.query.internal.CqStateImpl; +import com.gemstone.gemfire.cache.query.internal.DefaultQueryService; +import com.gemstone.gemfire.cache.query.internal.cq.CqQueryImpl; +import com.gemstone.gemfire.cache.query.internal.cq.CqService; +import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl; +import com.gemstone.gemfire.cache.query.internal.cq.CqServiceVsdStats; +import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.cache30.CacheTestCase; + +import dunit.Host; +import dunit.SerializableRunnable; +import dunit.VM; + +/** + * This class tests the ContiunousQuery mechanism in GemFire. + * This includes the test with different data activities. + * + * @author Rao + */ +public class CqStatsDUnitTest extends CacheTestCase { + + private CqQueryDUnitTest cqDUnitTest = new CqQueryDUnitTest("CqStatsDUnitTest"); + + public CqStatsDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + + // avoid IllegalStateException from HandShake by connecting all vms to + // system before creating pool + getSystem(); + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + getSystem(); + } + }); + + } + + public void validateCQStats(VM vm, final String cqName, + final int creates, + final int updates, + final int deletes, + final int totalEvents, + final int cqListenerInvocations) { + vm.invoke(new CacheSerializableRunnable("Validate CQs") { + public void run2() throws CacheException { + getLogWriter().info("### Validating CQ Stats. ### " + cqName); +// Get CQ Service. + QueryService qService = null; + try { + qService = getCache().getQueryService(); + } catch (Exception cqe) { + cqe.printStackTrace(); + fail("Failed to get query service."); + } + + CqService cqService = null; + try { + cqService = ((DefaultQueryService)qService).getCqService(); + } + catch (CqException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + fail("Failed to get CqService, CQ : " + cqName); + } + Collection<? extends InternalCqQuery> cqs = cqService.getAllCqs(); + if (cqs.size() == 0) { + fail("Failed to get CqQuery for CQ : " + cqName); + } + CqQueryImpl cQuery = (CqQueryImpl) cqs.iterator().next(); + + CqStatistics cqStats = cQuery.getStatistics(); + CqQueryVsdStats cqVsdStats = cQuery.getVsdStats(); + if (cqStats == null || cqVsdStats == null) { + fail("Failed to get CqQuery Stats for CQ : " + cqName); + } + + getCache().getLogger().info("#### CQ stats for " + cQuery.getName() + ": " + + " Events Total: " + cqStats.numEvents() + + " Events Created: " + cqStats.numInserts() + + " Events Updated: " + cqStats.numUpdates() + + " Events Deleted: " + cqStats.numDeletes() + + " CQ Listener invocations: " + cqVsdStats.getNumCqListenerInvocations() + + " Initial results time (nano sec): " + cqVsdStats.getCqInitialResultsTime()); + + +// Check for totalEvents count. + if (totalEvents != CqQueryDUnitTest.noTest) { +// Result size validation. + assertEquals("Total Event Count mismatch", totalEvents, cqStats.numEvents()); + } + +// Check for create count. + if (creates != CqQueryDUnitTest.noTest) { + assertEquals("Create Event mismatch", creates, cqStats.numInserts()); + } + +// Check for update count. + if (updates != CqQueryDUnitTest.noTest) { + assertEquals("Update Event mismatch", updates, cqStats.numUpdates()); + } + +// Check for delete count. + if (deletes != CqQueryDUnitTest.noTest) { + assertEquals("Delete Event mismatch", deletes, cqStats.numDeletes()); + } + +// Check for CQ listener invocations. + if (cqListenerInvocations != CqQueryDUnitTest.noTest) { + assertEquals("CQ Listener invocations mismatch", cqListenerInvocations, cqVsdStats.getNumCqListenerInvocations()); + } +//// Check for initial results time. +// if (initialResultsTime != CqQueryDUnitTest.noTest && cqVsdStats.getCqInitialResultsTime() <= 0) { +// assertEquals("Initial results time mismatch", initialResultsTime, cqVsdStats.getCqInitialResultsTime()); +// } + } + }); + } + + private void validateCQServiceStats(VM vm, + final int created, + final int activated, + final int stopped, + final int closed, + final int cqsOnClient, + final int cqsOnRegion, + final int clientsWithCqs) { + vm.invoke(new CacheSerializableRunnable("Validate CQ Service Stats") { + public void run2() throws CacheException { + getLogWriter().info("### Validating CQ Service Stats. ### "); +// Get CQ Service. + QueryService qService = null; + try { + qService = getCache().getQueryService(); + } catch (Exception cqe) { + cqe.printStackTrace(); + fail("Failed to getCQService."); + } + CqServiceStatistics cqServiceStats = null; + cqServiceStats = qService.getCqStatistics(); + CqServiceVsdStats cqServiceVsdStats = null; + try { + cqServiceVsdStats = ((CqServiceImpl) ((DefaultQueryService)qService).getCqService()).stats; + } + catch (CqException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + if (cqServiceStats == null) { + fail("Failed to get CQ Service Stats"); + } + + getCache().getLogger().info("#### CQ Service stats: " + + " CQs created: " + cqServiceStats.numCqsCreated() + + " CQs active: " + cqServiceStats.numCqsActive() + + " CQs stopped: " + cqServiceStats.numCqsStopped() + + " CQs closed: " + cqServiceStats.numCqsClosed() + + " CQs on Client: " + cqServiceStats.numCqsOnClient() + + " CQs on region /root/regionA : " + cqServiceVsdStats.numCqsOnRegion("/root/regionA") + + " Clients with CQs: " + cqServiceVsdStats.getNumClientsWithCqs()); + + + // Check for created count. + if (created != CqQueryDUnitTest.noTest) { + assertEquals("Number of CQs created mismatch", created, cqServiceStats.numCqsCreated()); + } + + // Check for activated count. + if (activated != CqQueryDUnitTest.noTest) { + assertEquals("Number of CQs activated mismatch", activated, cqServiceStats.numCqsActive()); + } + + // Check for stopped count. + if (stopped != CqQueryDUnitTest.noTest) { + assertEquals("Number of CQs stopped mismatch", stopped, cqServiceStats.numCqsStopped()); + } + + // Check for closed count. + if (closed != CqQueryDUnitTest.noTest) { + assertEquals("Number of CQs closed mismatch", closed, cqServiceStats.numCqsClosed()); + } + + // Check for CQs on client count. + if (cqsOnClient != CqQueryDUnitTest.noTest) { + assertEquals("Number of CQs on client mismatch", cqsOnClient, cqServiceStats.numCqsOnClient()); + } + + // Check for CQs on region. + if (cqsOnRegion != CqQueryDUnitTest.noTest) { + assertEquals("Number of CQs on region /root/regionA mismatch", + cqsOnRegion, cqServiceVsdStats.numCqsOnRegion("/root/regionA")); + } + + // Check for clients with CQs count. + if (clientsWithCqs != CqQueryDUnitTest.noTest) { + assertEquals("Clints with CQs mismatch", + clientsWithCqs, cqServiceVsdStats.getNumClientsWithCqs()); + } + } + }); + } + + private static final int PAUSE = 8 * 1000; // 5 * 1000 + + /** + * Test for CQ and CQ Service Statistics + * @throws Exception + */ + public void testCQStatistics() throws Exception { + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client = host.getVM(1); + + /* Init Server and Client */ + cqDUnitTest.createServer(server); + final int port = server.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + cqDUnitTest.createClient(client, port, host0); + + /* Create CQs. */ + cqDUnitTest.createCQ(client, "testCQStatistics_0", cqDUnitTest.cqs[0]); + + /* Init values at server. */ + int size = 100; + cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size); + + cqDUnitTest.executeCQ(client, "testCQStatistics_0", true, null); + + // Wait for CQ to be executed. + cqDUnitTest.waitForCqState(client, "testCQStatistics_0", CqStateImpl.RUNNING); + + // Test CQ stats + validateCQStats(client, "testCQStatistics_0", 0, 0, 0, 0, 0); + + // The stat would have not updated yet. + // Commenting out the following check; the check for resultset initialization + // is anyway done in the next validation. + // validateCQStats(server, "testCQStatistics_0", 0, 0, 0, 0, CqQueryDUnitTest.noTest, 1); + + /* Init values at server. */ + cqDUnitTest.createValues(server, cqDUnitTest.regions[0], 200); + // Wait for client to Synch. + cqDUnitTest.waitForCreated(client, "testCQStatistics_0", CqQueryDUnitTest.KEY+200); + pause(PAUSE); + size = 200; + + // validate CQs. + cqDUnitTest.validateCQ(client, "testCQStatistics_0", + /* resultSize: */ CqQueryDUnitTest.noTest, + /* creates: */ 100, + /* updates: */ 100, + /* deletes; */ 0, + /* queryInserts: */ 100, + /* queryUpdates: */ 100, + /* queryDeletes: */ 0, + /* totalEvents: */ 200); + + // Test CQ stats + validateCQStats(client, "testCQStatistics_0", 100, 100, 0, 200, 200); + //We don't have serverside CQ name + validateCQStats(server, "testCQStatistics_0", 100, 100, 0, 200, CqQueryDUnitTest.noTest); + + /* Delete values at server. */ + cqDUnitTest.deleteValues(server, cqDUnitTest.regions[0], 100); + // Wait for client to Synch. + cqDUnitTest.waitForDestroyed(client, "testCQStatistics_0", CqQueryDUnitTest.KEY+100); + size = 10; + pause(PAUSE); + + cqDUnitTest.validateCQ(client, "testCQStatistics_0", + /* resultSize: */ CqQueryDUnitTest.noTest, + /* creates: */100, + /* updates: */ 100, + /* deletes; */ 100, + /* queryInserts: */ 100, + /* queryUpdates: */ 100, + /* queryDeletes: */ 100, + /* totalEvents: */ 300); + + // Test CQ stats + validateCQStats(client, "testCQStatistics_0", 100, 100, 100, 300, 300); + //We don't have serverside CQ name + validateCQStats(server, "testCQStatistics_0", 100, 100, 100, 300, CqQueryDUnitTest.noTest); + + // Test CQ Close + cqDUnitTest.closeCQ(client, "testCQStatistics_0"); + pause(PAUSE); + + // Close. + cqDUnitTest.closeClient(client); + cqDUnitTest.closeServer(server); + } + /** + * Test for CQ Service Statistics + * @throws Exception + */ + public void testCQServiceStatistics() throws Exception { + + final Host host = Host.getHost(0); + VM server = host.getVM(0); + VM client1 = host.getVM(1); + VM client2 = host.getVM(2); + + + /* Init Server and Client */ + cqDUnitTest.createServer(server); + final int port = server.invokeInt(CqQueryDUnitTest.class, "getCacheServerPort"); + final String host0 = getServerHostName(server.getHost()); + cqDUnitTest.createClient(client1, port, host0); + cqDUnitTest.createClient(client2, port, host0); + + /* Create CQs. */ + String cqName = new String("testCQServiceStatistics_0"); + String cqName10 = new String("testCQServiceStatistics_10"); + cqDUnitTest.createCQ(client1, cqName, cqDUnitTest.cqs[0]); + cqDUnitTest.createCQ(client2, cqName10, cqDUnitTest.cqs[2]); + pause(PAUSE); + // Test CQ Service stats + getCache().getLogger().info("Validating CQ Service stats on clients: #1"); + validateCQServiceStats(client1, 1, 0, 1, 0, 1, 1, CqQueryDUnitTest.noTest); + validateCQServiceStats(server, 0, 0, 0, 0, CqQueryDUnitTest.noTest, 0, 0); + + cqDUnitTest.executeCQ(client1, cqName, false, null); + cqDUnitTest.executeCQ(client2, cqName10, false, null); + pause(PAUSE); + + getCache().getLogger().info("Validating CQ Service stats on clients: #2"); + validateCQServiceStats(client1, 1, 1, 0, 0, 1, 1, CqQueryDUnitTest.noTest); + validateCQServiceStats(client2, 1, 1, 0, 0, 1, CqQueryDUnitTest.noTest, CqQueryDUnitTest.noTest); + + getCache().getLogger().info("Validating CQ Service stats on server: #1"); + validateCQServiceStats(server, 2, 2, 0, 0, CqQueryDUnitTest.noTest, 1, 2); + + /* Init values at server. */ + int size = 10; + cqDUnitTest.createValues(server, cqDUnitTest.regions[0], size); + // Wait for client to Synch. + cqDUnitTest.waitForCreated(client1, "testCQServiceStatistics_0", CqQueryDUnitTest.KEY+size); + + // validate CQs. + cqDUnitTest.validateCQ(client1, cqName, + /* resultSize: */ CqQueryDUnitTest.noTest, + /* creates: */ size, + /* updates: */ 0, + /* deletes; */ 0, + /* queryInserts: */ size, + /* queryUpdates: */ 0, + /* queryDeletes: */ 0, + /* totalEvents: */ size); + pause(PAUSE); + + // Test CQ Service stats + getCache().getLogger().info("Validating CQ Service stats on clients: #3"); + validateCQServiceStats(client1, 1, 1, 0, 0, 1, 1, CqQueryDUnitTest.noTest); + validateCQServiceStats(client2, 1, 1, 0, 0, 1, CqQueryDUnitTest.noTest, CqQueryDUnitTest.noTest); + + getCache().getLogger().info("Validating CQ Service stats on server: #1"); + validateCQServiceStats(server, 2, 2, 0, 0, CqQueryDUnitTest.noTest, 1, 2); + + + //Create CQs with no name, execute, and close. + cqDUnitTest.createAndExecCQNoName(client1, cqDUnitTest.cqs[0]); + pause(PAUSE); + + // Test CQ Service stats + getCache().getLogger().info("Validating CQ Service stats on client: #4"); + validateCQServiceStats(client1, 21, 1, 0, 20, 1, 1, CqQueryDUnitTest.noTest); + + getCache().getLogger().info("Validating CQ Service stats on server: #2"); + validateCQServiceStats(server, 22, 2, 0, 20, CqQueryDUnitTest.noTest, 1, 2); + + // Test CQ Close + cqDUnitTest.closeCQ(client1, cqName); + pause(PAUSE); + + // Test CQ Service stats + getCache().getLogger().info("Validating CQ Service stats on client: #5"); + validateCQServiceStats(client1, 21, 0, 0, 21, 0, 0, CqQueryDUnitTest.noTest); + + getCache().getLogger().info("Validating CQ Service stats on server: #3"); + validateCQServiceStats(server, 22, 1, 0, 21, CqQueryDUnitTest.noTest, 0, 1); + + //Test stop CQ + cqDUnitTest.stopCQ(client2, cqName10); + pause(PAUSE); + + // Test CQ Service stats + getCache().getLogger().info("Validating CQ Service stats on client: #6"); + validateCQServiceStats(client2, 1, 0, 1, 0, 1, CqQueryDUnitTest.noTest, CqQueryDUnitTest.noTest); + getCache().getLogger().info("Validating CQ Service stats on server: #4"); + validateCQServiceStats(server, 22, 0, 1, 21, CqQueryDUnitTest.noTest, CqQueryDUnitTest.noTest, 1); + + // Test CQ Close + cqDUnitTest.closeCQ(client2, cqName10); + pause(PAUSE); + + // Test CQ Service stats + getCache().getLogger().info("Validating CQ Service stats on client: #7"); + validateCQServiceStats(client1, 21, 0, 0, 21, 0, 0, CqQueryDUnitTest.noTest); + validateCQServiceStats(client2, 1, 0, 0, 1, 0, 0, CqQueryDUnitTest.noTest); + getCache().getLogger().info("Validating CQ Service stats on server: #5"); + validateCQServiceStats(server, 22, 0, 0, 22, CqQueryDUnitTest.noTest, 0, 0); + + // Close. + cqDUnitTest.closeClient(client1); + cqDUnitTest.closeClient(client2); + cqDUnitTest.closeServer(server); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsOptimizedExecuteDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsOptimizedExecuteDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsOptimizedExecuteDUnitTest.java new file mode 100644 index 0000000..b7eaaa2 --- /dev/null +++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/cache/query/cq/dunit/CqStatsOptimizedExecuteDUnitTest.java @@ -0,0 +1,43 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.cache.query.cq.dunit; + +import com.gemstone.gemfire.cache.query.internal.cq.CqService; +import com.gemstone.gemfire.cache.query.internal.cq.CqServiceImpl; + +import dunit.SerializableRunnable; + +/** + * Test class for testing {@link CqService#EXECUTE_QUERY_DURING_INIT} flag + * + */ +public class CqStatsOptimizedExecuteDUnitTest extends CqStatsDUnitTest{ + + public CqStatsOptimizedExecuteDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + CqServiceImpl.EXECUTE_QUERY_DURING_INIT = false; + } + }); + } + + @Override + public void tearDown2() throws Exception { + invokeInEveryVM(new SerializableRunnable("getSystem") { + public void run() { + CqServiceImpl.EXECUTE_QUERY_DURING_INIT = true; + } + }); + super.tearDown2(); + } +}