http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java new file mode 100644 index 0000000..447f5ea --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java @@ -0,0 +1,603 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.EntryExistsException; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.internal.cache.wan.BatchException70; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; + +import dunit.AsyncInvocation; + +/** + * All the test cases are similar to SerialWANPropogationDUnitTest except that + * the we create concurrent serial GatewaySender with concurrency of 4 + * @author skumar + * + */ +public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase { + + /** + * @param name + */ + public ConcurrentWANPropogation_1_DUnitTest(String name) { + super(name); + } + + private static final long serialVersionUID = 1L; + + /** + * All the test cases are similar to SerialWANPropogationDUnitTest + * @throws Exception + */ + public void testReplicatedSerialPropagation_withoutRemoteSite() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + //keep the batch size high enough to reduce the number of exceptions in the log + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + addExpectedException(BatchException70.class.getName()); + addExpectedException(ServerOperationException.class.getName()); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm2.invoke(WANTestBase.class, "createCache", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createCache", new Object[] { nyPort }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm2.invoke(WANTestBase.class, "createReceiver2", + new Object[] {nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver2", + new Object[] {nyPort }); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + + public void testReplicatedSerialPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + + + public void testReplicatedSerialPropagationWithLocalSiteClosedAndRebuilt() throws Exception { + addExpectedException("Broken pipe"); + addExpectedException("Connection reset"); + addExpectedException("Unexpected IOException"); + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + //---------close local site and build again----------------------------------------- + vm4.invoke(WANTestBase.class, "killSender", new Object[] { }); + vm5.invoke(WANTestBase.class, "killSender", new Object[] { }); + vm6.invoke(WANTestBase.class, "killSender", new Object[] { }); + vm7.invoke(WANTestBase.class, "killSender", new Object[] { }); + + Integer regionSize = + (Integer) vm2.invoke(WANTestBase.class, "getRegionSize", new Object[] {testName + "_RR" }); + getLogWriter().info("Region size on remote is: " + regionSize); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + + vm4.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true }); + vm5.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new Object[] { "ln", true }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + + addExpectedException(EntryExistsException.class.getName()); + addExpectedException(BatchException70.class.getName()); + addExpectedException(ServerOperationException.class.getName()); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + //---------------------------------------------------------------------------------- + + //verify remote site receives all the events + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + + /** + * Two regions configured with the same sender and put is in progress + * on both the regions. + * One of the two regions is destroyed in the middle. + * + * @throws Exception + */ + public void testReplicatedSerialPropagationWithLocalRegionDestroy() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + //these are part of remote site + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + //these are part of local site + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + //senders are created on local site + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 20, false, false, null, true, 3, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 20, false, false, null, true ,3, OrderPolicy.THREAD}); + + //create one RR (RR_1) on remote site + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", null, isOffHeap() }); + + //create another RR (RR_2) on remote site + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", null, isOffHeap() }); + + //start the senders on local site + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + //create one RR (RR_1) on local site + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + + //create another RR (RR_2) on local site + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_1", 1000 }); + //do puts in RR_2 in main thread + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_2", 500 }); + //destroy RR_2 after above puts are complete + vm4.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName + "_RR_2"}); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + //sleep for some time to let all the events propagate to remote site + Thread.sleep(20); + //vm4.invoke(WANTestBase.class, "verifyQueueSize", new Object[] { "ln", 0 }); + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR_1", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR_2", 500 }); + } + + /** + * 1 region and sender configured on local site and 1 region and a + * receiver configured on remote site. Puts to the local region are in progress. + * Remote region is destroyed in the middle. + * + * @throws Exception + */ + public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + //these are part of remote site + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + //these are part of local site + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + //senders are created on local site + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY }); + + //create one RR (RR_1) on remote site + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", null, isOffHeap() }); + + //start the senders on local site + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + //create one RR (RR_1) on local site + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + + addExpectedException(BatchException70.class.getName()); + addExpectedException(ServerOperationException.class.getName()); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_1", 10000 }); + //destroy RR_1 in remote site + vm2.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName + "_RR_1"}); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + //verify that all is well in local site. All the events should be present in local region + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR_1", 10000 }); + //assuming some events might have been dispatched before the remote region was destroyed, + //sender's region queue will have events less than 1000 but the queue will not be empty. + //NOTE: this much verification might be sufficient in DUnit. Hydra will take care of + //more in depth validations. + vm4.invoke(WANTestBase.class, "verifyRegionQueueNotEmptyForConcurrentSender", new Object[] {"ln" }); + } + + /** + * Two regions configured in local with the same sender and put is in progress + * on both the regions. Same two regions are configured on remote site as well. + * One of the two regions is destroyed in the middle on remote site. + * + * @throws Exception + */ + public void testReplicatedSerialPropagationWithRemoteRegionDestroy2() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + //these are part of remote site + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + //these are part of local site + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + //senders are created on local site + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD }); + + //create one RR (RR_1) on remote site + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", null, isOffHeap() }); + + //create another RR (RR_2) on remote site + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", null, isOffHeap() }); + + //start the senders on local site + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + //create one RR (RR_1) on local site + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + + //create another RR (RR_2) on local site + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + //destroy RR_2 on remote site in the middle + vm2.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName + "_RR_2"}); + + //expected exceptions in the logs + addExpectedException(BatchException70.class.getName()); + addExpectedException(ServerOperationException.class.getName()); + + //start puts in RR_2 in another thread + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_2", 1000 }); + + //start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_1", 1000 }); + + try { + inv1.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + //though region RR_2 is destroyed, RR_1 should still get all the events put in it + //in local site + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR_1", 1000 }); + + } + + public void testReplicatedSerialPropagationWithRemoteRegionDestroy3() + throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + // these are part of remote site + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + // these are part of local site + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + // senders are created on local site + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD }); + + // create one RR (RR_1) on remote site + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", null, isOffHeap() }); + + // create another RR (RR_2) on remote site + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", null, isOffHeap() }); + + // start the senders on local site + vm4.invoke(WANTestBase.class, "setRemoveFromQueueOnException", + new Object[] { "ln", true }); + vm5.invoke(WANTestBase.class, "setRemoveFromQueueOnException", + new Object[] { "ln", true }); + + // start the senders on local site + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + // create one RR (RR_1) on local site + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_1", "ln", isOffHeap() }); + + // create another RR (RR_2) on local site + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR_2", "ln", isOffHeap() }); + + addExpectedException(BatchException70.class.getName()); + addExpectedException(ServerOperationException.class.getName()); + + // start puts in RR_1 in another thread + AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", + new Object[] { testName + "_RR_1", 1000 }); + // start puts in RR_2 in another thread + AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "doPuts", + new Object[] { testName + "_RR_2", 1000 }); + // destroy RR_2 on remote site in the middle + vm2.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName + + "_RR_2" }); + + try { + inv1.join(); + inv2.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + // though region RR_2 is destroyed, RR_1 should still get all the events put + // in it + // in local site + try { + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR_1", 1000 }); + } finally { + System.setProperty( + "gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + vm4.invoke(new CacheSerializableRunnable("UnSetting system property ") { + public void run2() throws CacheException { + System.setProperty( + "gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + } + }); + + vm5.invoke(new CacheSerializableRunnable("UnSetting system property ") { + public void run2() throws CacheException { + System.setProperty( + "gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False"); + } + }); + } + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java new file mode 100644 index 0000000..3f95552 --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java @@ -0,0 +1,480 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.wan.concurrent; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.EntryExistsException; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter; + +import dunit.AsyncInvocation; + +/** + * All the test cases are similar to SerialWANPropogationDUnitTest except that + * the we create concurrent serial GatewaySender with concurrency of 4 + * @author skumar + * + */ +public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase { + + /** + * @param name + */ + public ConcurrentWANPropogation_2_DUnitTest(String name) { + super(name); + } + + private static final long serialVersionUID = 1L; + + public void testSerialReplicatedWanWithOverflow() { + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId", + new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", + new Object[] {nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", + new Object[] {nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + //keep the maxQueueMemory low enough to trigger eviction + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 50, 5, false, false, null, true, 5, OrderPolicy.KEY }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 50, 5, false, false, null, true, 5, OrderPolicy.KEY }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doHeavyPuts", new Object[] { + testName + "_RR", 150 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 150 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 150 }); + } + + public void Bug46921_testSerialReplicatedWanWithPersistence() { + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + + } + + public void testReplicatedSerialPropagationToTwoWanSites() throws Exception { + + Integer lnPort = createFirstLocatorWithDSId(1); + Integer nyPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer tkPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 3, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "lnSerial1", + 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "lnSerial1", + 2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "lnSerial2", + 3, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "lnSerial2", + 3, false, 100, 10, false, false, null, true, 5 , OrderPolicy.THREAD}); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + + public void testReplicatedSerialPropagationHA() throws Exception { + addExpectedException("Broken pipe"); + addExpectedException("Connection reset"); + addExpectedException("Unexpected IOException"); + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + AsyncInvocation inv1 = vm5.invokeAsync(WANTestBase.class, "doPuts", + new Object[] { testName + "_RR", 10000 }); + pause(2000); + AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "killSender"); + + inv1.join(); + inv2.join(); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 10000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 10000 }); + } + + public void testReplicatedSerialPropagationWithConflation() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + + public void testReplicatedSerialPropagationWithParallelThreads() + throws Exception { + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] {1}); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] {2,lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doMultiThreadedPuts", new Object[] { + testName + "_RR", 1000 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + + public void testSerialPropogationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, "createFirstLocatorWithDSId", + new Object[] {1}); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] {2,lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName, "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName, "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName, "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName, "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName, null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName, null, 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName, 800 }); + } + + public void testReplicatedSerialPropagationWithFilter() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, + new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName, null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName, null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName, "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName, "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName, "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName, "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName, 800 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName, 800 }); + } + + public void testNormalRegionSerialPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + pause(500); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createNormalRegion", new Object[] { + testName + "_RR", "ln" }); + vm5.invoke(WANTestBase.class, "createNormalRegion", new Object[] { + testName + "_RR", "ln" }); + + vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] { "ln", 0, + 0, 0, 0}); + + vm5.invoke(WANTestBase.class, "checkQueueStats", new Object[] { "ln", 0, + 1000, 0, 0 }); + + vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000}); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 0}); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 0 }); + + vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] {0, 0, 0}); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java new file mode 100644 index 0000000..057f89b --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java @@ -0,0 +1,194 @@ +package com.gemstone.gemfire.internal.cache.wan.disttx; + +import com.gemstone.gemfire.CancelException; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.internal.cache.ForceReattemptException; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; + +import dunit.AsyncInvocation; +import dunit.SerializableCallable; + +public class DistTXWANDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public DistTXWANDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + + this.invokeInEveryVM(new SerializableCallable() { + @Override + public Object call() throws Exception { + System.setProperty("gemfire.log-level", getDUnitLogLevel()); + return null; + } + }); + } + + + /* + * Disabled because it hangs with current implementation of notifying + * adjunct receivers by sending DistTXAdjunctCommitMessage from primary at the + * time of commit. + */ + public void DISABLED_testPartitionedSerialPropagation_SenderSameAsCoordinator() throws Exception { + + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doDistTXPuts", new Object[] { testName + "_PR", + 50 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR", 50 }); + } + + public void testPartitionedSerialPropagation_SenderNotSameAsCoordinator() throws Exception { + + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 100, isOffHeap() }); + + vm6.invoke(WANTestBase.class, "doDistTXPuts", new Object[] { testName + "_PR", + 50 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR", 50 }); + } + + + public void testPartitionedRegionParallelPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 100, isOffHeap() }); + + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "doDistTXPuts", new Object[] { testName + "_PR", + 5 }); + + //verify all buckets drained on all sender nodes. + vm4.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm5.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm6.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm7.invoke(WANTestBase.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR", 5 }); + } + + public void testDummy() { + } + +} + http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java new file mode 100644 index 0000000..eea4f81 --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java @@ -0,0 +1,476 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor; +import com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; +import com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue; + +import dunit.AsyncInvocation; +import dunit.DistributedTestCase; + +import java.util.Set; + +/** + * @author skumar + * + */ +public class CommonParallelGatewaySenderDUnitTest extends WANTestBase { + + public CommonParallelGatewaySenderDUnitTest(String name ){ + super(name); + } + + public void testSameSenderWithNonColocatedRegions() throws Exception { + addExpectedException("cannot have the same parallel"); + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + try { + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + fail("Expected IllegateStateException : cannot have the same parallel gateway sender"); + } + catch (Exception e) { + if (!(e.getCause() instanceof IllegalStateException) + || !(e.getCause().getMessage() + .contains("cannot have the same parallel gateway sender id"))) { + fail("Expected IllegalStateException", e); + } + } + } + + /** + * Simple scenario. Two regions attach the same PGS + * @throws Exception + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + public void DISABLED_testParallelPropagation() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", null, 1, 100, isOffHeap() }); + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", null, 1, 100, isOffHeap() }); + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR1", + 1000 }); + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR2", + 1000 }); + + //verify all buckets drained on all sender nodes. + vm4.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm5.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm6.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm7.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR1", 1000 }); + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR2", 1000 }); + } + + /** + * The PGS is persistence enabled but not the Regions + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + public void DISABLED_testParallelPropagationPersistenceEnabled() throws Exception { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, true, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, true, null, true }); + vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, true, null, true }); + vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, true, null, true }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR1", null, 1, 100, isOffHeap() }); + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR2", null, 1, 100, isOffHeap() }); + //before doing any puts, let the senders be running in order to ensure that + //not a single event will be lost + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR1", + 1000 }); + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR2", + 1000 }); + + //verify all buckets drained on all sender nodes. + vm4.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm5.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm6.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + vm7.invoke(CommonParallelGatewaySenderDUnitTest.class, "validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"}); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR1", 1000 }); + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR2", 1000 }); + } + + + /** + * Enable persistence for GatewaySender. + * Pause the sender and do some puts in local region. + * Close the local site and rebuild the region and sender from disk store. + * Dispatcher should not start dispatching events recovered from persistent sender. + * Check if the remote site receives all the events. + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + public void DISABLED_testPRWithGatewaySenderPersistenceEnabled_Restart() { + //create locator on local site + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + //create locator on remote site + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + //create receiver on remote site + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + //create cache in local site + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + //create senders with disk store + String diskStore1 = (String) vm4.invoke(WANTestBase.class, "createSenderWithDiskStore", + new Object[] { "ln", 2, true, 100, 10, false, true, null, null, true }); + String diskStore2 = (String) vm5.invoke(WANTestBase.class, "createSenderWithDiskStore", + new Object[] { "ln", 2, true, 100, 10, false, true, null, null, true }); + String diskStore3 = (String) vm6.invoke(WANTestBase.class, "createSenderWithDiskStore", + new Object[] { "ln", 2, true, 100, 10, false, true, null, null, true }); + String diskStore4 = (String) vm7.invoke(WANTestBase.class, "createSenderWithDiskStore", + new Object[] { "ln", 2, true, 100, 10, false, true, null, null, true }); + + getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + diskStore3 + "," + diskStore4); + + //create PR on remote site + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", null, 1, 100, isOffHeap() }); + + //create PR on remote site + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", null, 1, 100, isOffHeap() }); + + //create PR on local site + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", "ln", 1, 100, isOffHeap() }); + + //create PR on local site + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", "ln", 1, 100, isOffHeap() }); + + + //start the senders on local site + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + //wait for senders to become running + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + + //pause the senders + vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" }); + + //start puts in region on local site + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName+"PR1", 3000 }); + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName+"PR2", 5000 }); + getLogWriter().info("Completed puts in the region"); + + //--------------------close and rebuild local site ------------------------------------------------- + //kill the senders + vm4.invoke(WANTestBase.class, "killSender", new Object[] {}); + vm5.invoke(WANTestBase.class, "killSender", new Object[] {}); + vm6.invoke(WANTestBase.class, "killSender", new Object[] {}); + vm7.invoke(WANTestBase.class, "killSender", new Object[] {}); + + getLogWriter().info("Killed all the senders."); + + //restart the vm + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + getLogWriter().info("Created back the cache"); + + //create senders with disk store + vm4.invoke(WANTestBase.class, "createSenderWithDiskStore", + new Object[] { "ln", 2, true, 100, 10, false, true, null, diskStore1, true }); + vm5.invoke(WANTestBase.class, "createSenderWithDiskStore", + new Object[] { "ln", 2, true, 100, 10, false, true, null, diskStore2, true }); + vm6.invoke(WANTestBase.class, "createSenderWithDiskStore", + new Object[] { "ln", 2, true, 100, 10, false, true, null, diskStore3, true }); + vm7.invoke(WANTestBase.class, "createSenderWithDiskStore", + new Object[] { "ln", 2, true, 100, 10, false, true, null, diskStore4, true }); + + getLogWriter().info("Created the senders back from the disk store."); + //create PR on local site + AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", "ln", 1, 100, isOffHeap() }); + AsyncInvocation inv2 = vm5.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", "ln", 1, 100, isOffHeap() }); + AsyncInvocation inv3 = vm6.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", "ln", 1, 100, isOffHeap() }); + AsyncInvocation inv4 = vm7.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR1", "ln", 1, 100, isOffHeap() }); + + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + inv1 = vm4.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", "ln", 1, 100, isOffHeap() }); + inv2 = vm5.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", "ln", 1, 100, isOffHeap() }); + inv3 = vm6.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", "ln", 1, 100, isOffHeap() }); + inv4 = vm7.invokeAsync(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName+"PR2", "ln", 1, 100, isOffHeap() }); + + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + fail(); + } + + getLogWriter().info("Created back the partitioned regions"); + + //start the senders in async mode. This will ensure that the + //node of shadow PR that went down last will come up first + vm4.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm6.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm7.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" }); + + getLogWriter().info("Waiting for senders running."); + //wait for senders running + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln" }); + + getLogWriter().info("All the senders are now running..."); + + //---------------------------------------------------------------------------------------------------- + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName+"PR1", 3000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName+"PR1", 3000 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName+"PR2", 5000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName+"PR2", 5000 }); + } + + public static void validateParallelSenderQueueAllBucketsDrained(final String senderId) { + Set<GatewaySender> senders = cache.getGatewaySenders(); + GatewaySender sender = null; + for (GatewaySender s : senders) { + if (s.getId().equals(senderId)) { + sender = s; + break; + } + } + ConcurrentParallelGatewaySenderQueue regionQueue = (ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)sender).getQueues().toArray(new RegionQueue[1])[0]; + + Set<PartitionedRegion> shadowPRs = (Set<PartitionedRegion>)regionQueue.getRegions(); + + for(PartitionedRegion shadowPR: shadowPRs) { + Set<BucketRegion> buckets = shadowPR.getDataStore().getAllLocalBucketRegions(); + + for (final BucketRegion bucket : buckets) { + WaitCriterion wc = new WaitCriterion() { + public boolean done() { + if (bucket.keySet().size() == 0) { + getLogWriter().info("Bucket " + bucket.getId() + " is empty"); + return true; + } + return false; + } + + public String description() { + return "Expected bucket entries for bucket: " + bucket.getId() + " is: 0 but actual entries: " + + bucket.keySet().size() + " This bucket isPrimary: " + bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet(); + } + }; + DistributedTestCase.waitForCriterion(wc, 180000, 50, true); + + }//for loop ends + } + + + } + +}