http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java new file mode 100644 index 0000000..1c073f9 --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java @@ -0,0 +1,1124 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.StringTokenizer; + +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.CacheClosedException; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.EntryExistsException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionDestroyedException; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.client.ServerConnectivityException; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.cache.wan.GatewaySender; +import com.gemstone.gemfire.distributed.internal.ReplyException; +import com.gemstone.gemfire.internal.cache.BucketRegion; +import com.gemstone.gemfire.internal.cache.ForceReattemptException; +import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.cache.PrimaryBucketException; +import com.gemstone.gemfire.internal.cache.PutAllPartialResultException; +import com.gemstone.gemfire.internal.cache.RegionQueue; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; + +import dunit.AsyncInvocation; +import dunit.DistributedTestCase; +import dunit.DistributedTestCase.ExpectedException; +import dunit.DistributedTestCase.WaitCriterion; + +public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends WANTestBase{ + + public ReplicatedRegion_ParallelWANPropogationDUnitTest(String name) { + super(name); + // TODO Auto-generated constructor stub + } + + final String expectedExceptions = null; + + + /** + * + */ + private static final long serialVersionUID = 1L; + + public void test_DR_PGS_1Nodes_Put_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); + fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region"); + } + catch (Exception e) { + if (!e.getCause().getMessage() + .contains("can not be used with replicated region")) { + fail("Expected GatewaySenderConfigException where parallel gateway sender can not be used with replicated region"); + } + } + } + + /*1. Validate that parallelGAtewaySenderId can be added to distributed region + *Region distributed ack/noack + PGS + *1. Find out the restrictions on totalNumBuckets on shadowPR + *2. Find out the restrictions on redundancy on shadowPR + *3. Find out the restrictions on localMaxMemory on shadowPR + *4. Find out the best way user will specify PR attributes to PGS + *5. Find out the restrictions on ordering. + *6. put on region populates the queue + *7. put on region reaches to remote site. Dispatcher works as expected + *8. m1 and m2 has DR(ack/noack). put on DR from m1 populates queue on both m1 and m2. Validate that remote site got all the events + *9. m1 and m2 has DR(ack/noack). create/put/destroy/operations populates the queue. Validate that remote site got correct events + *10. m1 and m2 has DR(ack/noack). localDestory is called on m1's DR. This locally destroys M1's shadowPr + *11. m1 and m2 has DR(ack/noack). destory is called on m1's DR. This destroys entire shadowPr on m1 and m2 + *12. m1 and m2 has DR(ack/noack). close Region is called on m1's DR. This locally destroys shadowPr on m1 + *13. m1 and m2 has DR(ack/noack). cache.close on m1'. This locally destroys shadowPr on m1 + *14. Validate HA scenario does not cause any event loss + *15. PDX events of DR are propagated to remote sites + *16. validate stats + *17: PR and DR regions with same name.. Can this be created. If yes then how to differentiate these 2 different shadowPR. + *18. test for redundancy. FOR SPR's redundancy will be equal to the number of nodes where DR is present. Max is 3. I know this needs to be figure it out at runtime. + *19. test without prviding diskstorename..I suspect some problem with this code. diskStoreName=null looks like this is not handled very well. need to verify + *20. PAralleGatewaySenderQueue#addPR method has multiple check for inPersistenceEnabled. Can's we do it with only one check. + */ + + /** + * Test to validate that created parallel gatewaySenderls id can be added to + * distributed region + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + public void DISABLED_test_PGS_Started_DR_CREATED_NO_RECEIVER() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); +/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class + .getName(), vm4); + ExpectedException exp2 = addExpectedException(InterruptedException.class + .getName(), vm4); + try { +*/ vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, false }); + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { + testName + "_RR", 1000 }); + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { + "ln1", 1000 }); + +/* } + finally { + exp1.remove(); + exp2.remove(); + } +*/ } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /** + * Test to validate that distributed region with given paralleGAtewaySender id + * is created first and then a same parallelGatewaySender is created + * a single put in DR is enqueued in parallelQueue + * Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0 + */ + public void DISABLED_test_DR_CREATED_PGS_STARTED_NO_RECEIVER() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); +/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class + .getName(), vm4); + ExpectedException exp2 = addExpectedException(InterruptedException.class + .getName(), vm4); + try {*/ + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, false }); + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { + testName + "_RR", 1000 }); + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { + "ln1", 1000 }); +/* } + finally { + exp1.remove(); + exp2.remove(); + } +*/ } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_1Node_Put_ValidateQueue_No_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + +/* ExpectedException exp1 = addExpectedException(GatewaySenderException.class + .getName(), vm4); + ExpectedException exp2 = addExpectedException(InterruptedException.class + .getName(), vm4); + try {*/ + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true }); + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { + testName + "_RR", 10000 }); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 10000 }); + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { + "ln1", 10000 }); +/* } + finally { + exp1.remove(); + exp2.remove(); + } + */ + } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_2Nodes_Put_ValidateQueue_No_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + +/* ExpectedException exp1 = addExpectedException( + GatewaySenderException.class.getName()); + ExpectedException exp2 = addExpectedException( + InterruptedException.class.getName()); + ExpectedException exp3 = addExpectedException( + CacheClosedException.class.getName()); + try { +*/ vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { + testName + "_RR", 1000 }); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { + "ln1", 1000 }); + vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { + "ln1", 1000 }); + +/* } + finally { + exp1.remove(); + exp2.remove(); + exp3.remove(); + } +*/ + } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + +// public void test_DR_PGS_ORDERPOLICY_PARTITION_EXPECTException(){ +// +// } +// public void test_DR_PGS_DISKSTORE_NAME_PROVIDED_VALIDATE_DISK(){ +// +// } +// public void test_DR_PGS_DISKSTORE_NAME_NOT_PROVIDED_VALIDATE_DISK(){ +// +// } +// +// public void test_DR_PGS_START_STOP_START(){ +// +// } +// +// public void test_DR_PGS_PERSISTENCE_START_STOP_START(){ +// +// } +// +// public void test_DR_PGS_START_PAUSE_STOP(){ +// +// } +// +// public void test_DR_PGS_START_PAUSE_RESUME_VALIDATE_RECEIVER(){ +// +// } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_1Nodes_Put_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true}); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"}); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000}); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000}); + + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000}); + } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_2Nodes_Put_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true}); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true}); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"}); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"}); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_2Nodes_EMPTY_Put_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.EMPTY, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true}); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true}); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"}); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"}); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + +// vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", +// 1000 }); + vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PR_PGS_4Nodes_Put_Receiver_2Nodes() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 10, 100, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 10, 100, false, false, null, true }); + vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 10, 100, false, false, null, true }); + vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 10, 100, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln"}); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln"}); + vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln"}); + vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln"}); + + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + vm5.invoke(WANTestBase.class, "doNextPuts", new Object[] { testName + "_PR", + 1000, 2000 }); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_PR", + 1000 }); + vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln", + 0 }); + vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln", + 0 }); + +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR", 1000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR", 1000 }); +/* } + finally { + exp1.remove(); + } +*/ } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_NOMANUALSTART_4Nodes_Put_ValidateReceiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, false }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, false }); + vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, false }); + vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, false }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln1", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + + + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + vm6.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + vm7.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); +/* } + finally { + exp1.remove(); + }*/ + } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_4Nodes_Put_CLOSE4NODESCACHE_RECREATE_PUT_ValidateReceiver() + throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + // before doing any puts, let the senders be running in order to ensure + // that + // not a single event will be lost + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + vm4.invoke(WANTestBase.class, "killSender", new Object[] {}); + vm5.invoke(WANTestBase.class, "killSender", new Object[] {}); + vm6.invoke(WANTestBase.class, "killSender", new Object[] {}); + vm7.invoke(WANTestBase.class, "killSender", new Object[] {}); +/* } + finally { + exp1.remove(); + }*/ + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + // ------------------------------------------------------------------------------------ + + vm4.invoke(WANTestBase.class, "doNextPuts", new Object[] { + testName + "_RR", 1000, 2000 }); + + // verify all buckets drained on all sender nodes. + vm4.invoke(WANTestBase.class, + "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, + "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, + "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, + "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" }); + +/* exp1 = addExpectedException(CacheClosedException.class.getName()); + try {*/ + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 2000 }); + vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 2000 }); +/* } + finally { + exp1.remove(); + }*/ + } + catch (Exception e) { + fail("Unexpected exception", e); + } + + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_NO_ACK_PGS_2Nodes_Put_ValidateQueue_Receiver() throws Exception { + try { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, + "createReplicatedRegion", new Object[] { testName + "_RR", "ln1", + Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() }); + vm5.invoke(WANTestBase.class, + "createReplicatedRegion", new Object[] { testName + "_RR", "ln1", + Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true}); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 100, false, false, null, true}); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"}); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"}); + + vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { testName + "_RR", + 1000 }); + + vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { "ln1", + 0 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + catch (Exception e) { + fail("Unexpected exception", e); + } + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_2NODES_1NODESDOWN_Validate_Receiver() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true, + 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true, + 100, 10, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + + pauseWaitCriteria(60000); + +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try {*/ + AsyncInvocation inv1 = vm4.invokeAsync( + ReplicatedRegion_ParallelWANPropogationDUnitTest.class, "doPuts0", new Object[] { + testName + "_RR", 1000 }); + pause(1000); + AsyncInvocation inv2 = vm5.invokeAsync(WANTestBase.class, "killSender"); + try { + inv1.join(); + inv2.join(); + } + catch (Exception e) { + fail("UnExpected Exception", e); + } +/* } + finally { + exp1.remove(); + }*/ + + Integer size = (Integer)vm4.invoke(WANTestBase.class, + "getQueueContentSize", new Object[] { "ln" }); + getLogWriter().info("The size of the queue is in vm4 " + size); + + + vm4.invoke(WANTestBase.class, + "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" }); + + size = (Integer)vm4.invoke(WANTestBase.class, + "getQueueContentSize", new Object[] { "ln" }); + getLogWriter().info("The size of the queue is in vm4 " + size); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 1000 }); + } + + /**Below test is disabled intentionally + 1> In this release 8.0, for rolling upgrade support queue name is changed to old style + 2>Comman parallel sender for different non colocated regions is not supported in 8.0 so no need to bother about + ParallelGatewaySenderQueue#convertPathToName + 3> We have to enabled it in next release + 4> Version based rolling upgrade support should be provided. based on the version of the gemfire QSTRING should be used between 8.0 + and version prior to 8.0*/ + public void DISABLED_test_DR_PGS_4NODES_2NODESDOWN_Validate_Receiver() throws Exception { + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true, + 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true, + 100, 10, false, false, null, true }); + vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true, + 100, 10, false, false, null, true }); + vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true, + 100, 10, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm6.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, "waitForSenderRunningState", + new Object[] { "ln" }); + + pauseWaitCriteria(60000); +/* ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try */{ + AsyncInvocation inv1 = vm7.invokeAsync( + ReplicatedRegion_ParallelWANPropogationDUnitTest.class, "doPuts0", new Object[] { + testName + "_RR", 10000 }); + pauseWaitCriteria(1000); + AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "killSender"); + pauseWaitCriteria(2000); + AsyncInvocation inv3 = vm6.invokeAsync( + ReplicatedRegion_ParallelWANPropogationDUnitTest.class, "doPuts1", new Object[] { + testName + "_RR", 10000 }); + pauseWaitCriteria(1500); + AsyncInvocation inv4 = vm5.invokeAsync(WANTestBase.class, "killSender"); + try { + inv1.join(); + inv2.join(); + inv3.join(); + inv4.join(); + } + catch (Exception e) { + fail("UnExpected Exception", e); + } + }/* + finally { + exp1.remove(); + }*/ + + vm6.invoke(WANTestBase.class, + "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" }); + vm7.invoke(WANTestBase.class, + "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 10000 }); + + } + + public static void doPuts0(String regionName, int numPuts) { + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try { + + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + getLogWriter().info("Put : key : " + i); + r.put(i, "0_" + i); + } + } finally { + exp.remove(); + exp1.remove(); + } + } + + public static void doPuts1(String regionName, int numPuts){ + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try { + + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + getLogWriter().info("Put : key : " + i); + r.put(i, "1_" + i); + } + } finally { + exp.remove(); + exp1.remove(); + } + } + + public static void doPuts2(String regionName, int numPuts){ + ExpectedException exp = addExpectedException(ForceReattemptException.class + .getName()); + ExpectedException exp1 = addExpectedException(CacheClosedException.class + .getName()); + try { + Region r = cache.getRegion(Region.SEPARATOR + regionName); + assertNotNull(r); + for (long i = 0; i < numPuts; i++) { + getLogWriter().info("Put : key : " + i); + r.put(i, "2_" + i); + } + } finally { + exp.remove(); + exp1.remove(); + } + } + + /** + * Test to validate that put on DR with no ack on multiple nodes are propogated to parallelqueue on multiple nodes + */ + + /** + * Test to validate that the single put in DR is propoagted to remote site through paralleHatewaySender + */ + + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java new file mode 100644 index 0000000..4e15ab7 --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java @@ -0,0 +1,266 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Properties; +import java.util.zip.Adler32; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; + +import com.gemstone.gemfire.cache.CacheFactory; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.wan.GatewayReceiver; +import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory; +import com.gemstone.gemfire.cache.wan.GatewaySenderFactory; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.internal.DistributionConfig; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.internal.AvailablePortHelper; +import com.gemstone.gemfire.internal.cache.wan.CompressionInputStream; +import com.gemstone.gemfire.internal.cache.wan.CompressionOutputStream; +import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyLocatorCallback; + +import dunit.VM; + +public class SenderWithTransportFilterDUnitTest extends WANTestBase { + + private static final long serialVersionUID = 1L; + + public SenderWithTransportFilterDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + } + + public void testSerialSenderWithTansportFilter() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(SenderWithTransportFilterDUnitTest.class, + "createReceiverWithTransportFilters", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(SenderWithTransportFilterDUnitTest.class, + "createSenderWithTransportFilter", new Object[] { "ln", 2, false, 100, + 1, false, false, true }); + + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPuts", + new Object[] { testName + "_RR", 100 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_RR", 100 }); + } + + public void testParallelSenderWithTansportFilter() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(SenderWithTransportFilterDUnitTest.class, + "createReceiverWithTransportFilters", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 10, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(SenderWithTransportFilterDUnitTest.class, + "createSenderWithTransportFilter", new Object[] { "ln", 2, true, 100, + 1, false, false, true }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 10, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPuts", + new Object[] { testName + "_PR", 100 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] { + testName + "_PR", 100 }); + } + + public static int createReceiverWithTransportFilters(int locPort) { + WANTestBase test = new WANTestBase(testName); + Properties props = new Properties(); + props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); + props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort + + "]"); + + InternalDistributedSystem ds = test.getSystem(props); + cache = CacheFactory.create(ds); + GatewayReceiverFactory fact = cache.createGatewayReceiverFactory(); + int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite(); + fact.setStartPort(port); + fact.setEndPort(port); + ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); + transportFilters.add(new CheckSumTranportFilter("CheckSumTranportFilter")); + transportFilters.add(new CompressionTranportFilter("CompressionTranportFilter")); + if (!transportFilters.isEmpty()) { + for (GatewayTransportFilter filter : transportFilters) { + fact.addGatewayTransportFilter(filter); + } + } + GatewayReceiver receiver = fact.create(); + try { + receiver.start(); + } + catch (IOException e) { + e.printStackTrace(); + fail("Test " + test.getName() + + " failed to start GatewayRecevier on port " + port); + } + return port; + } + + public static void createSenderWithTransportFilter(String dsName, + int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize, + boolean isConflation, boolean isPersistent, boolean isManulaStart) { + File persistentDirectory = new File(dsName + "_disk_" + + System.currentTimeMillis() + "_" + VM.getCurrentVMNum()); + persistentDirectory.mkdir(); + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + File[] dirs1 = new File[] { persistentDirectory }; + + if (isParallel) { + GatewaySenderFactory gateway = cache + .createGatewaySenderFactory(); + gateway.setParallel(true); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new MyLocatorCallback()); + ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); + transportFilters.add(new CompressionTranportFilter("CompressionTranportFilter")); + transportFilters.add(new CheckSumTranportFilter("CheckSumTranportFilter")); + if (!transportFilters.isEmpty()) { + for (GatewayTransportFilter filter : transportFilters) { + gateway.addGatewayTransportFilter(filter); + } + } + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.setBatchConflationEnabled(isConflation); + gateway.create(dsName, remoteDsId); + + } + else { + GatewaySenderFactory gateway = cache + .createGatewaySenderFactory(); + gateway.setMaximumQueueMemory(maxMemory); + gateway.setBatchSize(batchSize); + gateway.setManualStart(isManulaStart); + ((InternalGatewaySenderFactory)gateway) + .setLocatorDiscoveryCallback(new MyLocatorCallback()); + ArrayList<GatewayTransportFilter> transportFilters = new ArrayList<GatewayTransportFilter>(); + transportFilters.add(new CompressionTranportFilter("CompressionTranportFilter")); + transportFilters.add(new CheckSumTranportFilter("CheckSumTranportFilter")); + if (!transportFilters.isEmpty()) { + for (GatewayTransportFilter filter : transportFilters) { + gateway.addGatewayTransportFilter(filter); + } + } + gateway.setBatchConflationEnabled(isConflation); + if (isPersistent) { + gateway.setPersistenceEnabled(true); + gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName) + .getName()); + } + else { + DiskStore store = dsf.setDiskDirs(dirs1).create(dsName); + gateway.setDiskStoreName(store.getName()); + } + gateway.create(dsName, remoteDsId); + } + } + + static class CompressionTranportFilter implements GatewayTransportFilter { + + private String name; + + public CompressionTranportFilter(String name){ + this.name = name; + } + + public String toString(){ + return this.name; + } + public InputStream getInputStream(InputStream stream) { + return new CompressionInputStream(stream); + // return new ZipInputStream(stream); + } + + public OutputStream getOutputStream(OutputStream stream) { + return new CompressionOutputStream(stream); + // return new ZipOutputStream(stream); + } + + public void close() { + // TODO Auto-generated method stub + } + + } + + static class CheckSumTranportFilter implements GatewayTransportFilter { + + Adler32 checker = new Adler32(); + + private String name; + + public CheckSumTranportFilter(String name){ + this.name = name; + } + + public String toString(){ + return this.name; + } + public InputStream getInputStream(InputStream stream) { + return new CheckedInputStream(stream, checker); + // return new ZipInputStream(stream); + } + + public OutputStream getOutputStream(OutputStream stream) { + return new CheckedOutputStream(stream, checker); + // return new ZipOutputStream(stream); + } + + public void close() { + // TODO Auto-generated method stub + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java new file mode 100644 index 0000000..211f726 --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java @@ -0,0 +1,197 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import java.util.Set; + +import com.gemstone.gemfire.admin.AdminDistributedSystemFactory; +import com.gemstone.gemfire.admin.AdminException; +import com.gemstone.gemfire.admin.DistributedSystemConfig; +import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.internal.cache.CacheObserverAdapter; +import com.gemstone.gemfire.internal.cache.CacheObserverHolder; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; + +import dunit.AsyncInvocation; +import dunit.DistributedTestCase; +import dunit.SerializableRunnable; +import dunit.VM; + +public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase { + private static final long MAX_WAIT = 70000; + + private static final int NUM_KEYS = 1000; + + public ShutdownAllPersistentGatewaySenderDUnitTest(String name) { + super(name); + } + + @Override + public void setUp() throws Exception { + super.setUp(); + addExpectedException("Cache is being closed by ShutdownAll"); + } + + private static final long serialVersionUID = 1L; + + public void testGatewaySender() throws Exception { + addExpectedException("Cache is shutting down"); + + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createCache", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createCache", new Object[] { nyPort }); + vm2.invoke(WANTestBase.class, "createReceiverAfterCache", + new Object[] { nyPort }); + + vm2.invoke(WANTestBase.class, "createPersistentPartitionedRegion", + new Object[] { testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm3.invoke(WANTestBase.class, "createPersistentPartitionedRegion", + new Object[] { testName + "_PR", "ln", 1, 100, isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 400, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "createPersistentPartitionedRegion", + new Object[] { testName + "_PR", "ln", 1, 100, isOffHeap() }); + + // set the CacheObserver to block the ShutdownAll + SerializableRunnable waitAtShutdownAll = new SerializableRunnable() { + @Override + public void run() { + LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true; + CacheObserverHolder.setInstance(new CacheObserverAdapter() { + @Override + public void beforeShutdownAll() { + final Region region = cache.getRegion(testName + "_PR"); + waitForCriterion(new WaitCriterion() { + @Override + public boolean done() { + return region.size() >= 2; + } + + @Override + public String description() { + return "Wait for wan to have processed several events"; + } + }, 30000, 100, true); + } + }); + } + }; + vm2.invoke(waitAtShutdownAll); + vm3.invoke(waitAtShutdownAll); + + AsyncInvocation vm4_future = vm4.invokeAsync(WANTestBase.class, "doPuts", + new Object[] { testName + "_PR", NUM_KEYS }); + + // ShutdownAll will be suspended at observer, so puts will continue + AsyncInvocation future = shutDownAllMembers(vm2, 2, MAX_WAIT); + future.join(MAX_WAIT); + + // now restart vm1 with gatewayHub + getLogWriter().info("restart in VM2"); + vm2.invoke(WANTestBase.class, "createCache", new Object[] { nyPort }); + vm3.invoke(WANTestBase.class, "createCache", new Object[] { nyPort }); + AsyncInvocation vm3_future = vm3.invokeAsync(WANTestBase.class, + "createPersistentPartitionedRegion", new Object[] { testName + "_PR", + "ln", 1, 100, isOffHeap() }); + vm2.invoke(WANTestBase.class, "createPersistentPartitionedRegion", + new Object[] { testName + "_PR", "ln", 1, 100, isOffHeap() }); + vm3_future.join(MAX_WAIT); + + vm3.invoke(new SerializableRunnable() { + public void run() { + final Region region = cache.getRegion(testName + "_PR"); + cache.getLogger().info( + "vm1's region size before restart gatewayhub is " + region.size()); + } + }); + vm2.invoke(WANTestBase.class, "createReceiverAfterCache", + new Object[] { nyPort }); + + // wait for vm0 to finish its work + vm4_future.join(MAX_WAIT); + vm4.invoke(new SerializableRunnable() { + public void run() { + Region region = cache.getRegion(testName + "_PR"); + assertEquals(NUM_KEYS, region.size()); + } + }); + + // verify the other side (vm1)'s entries received from gateway + vm2.invoke(new SerializableRunnable() { + public void run() { + final Region region = cache.getRegion(testName + "_PR"); + + cache.getLogger().info( + "vm1's region size after restart gatewayhub is " + region.size()); + waitForCriterion(new WaitCriterion() { + public boolean done() { + Object lastvalue = region.get(NUM_KEYS - 1); + if (lastvalue != null && lastvalue.equals(NUM_KEYS - 1)) { + region.getCache().getLogger().info( + "Last key has arrived, its value is " + lastvalue + + ", end of wait."); + return true; + } + else + return (region.size() == NUM_KEYS); + } + + public String description() { + return "Waiting for destination region to reach size: " + NUM_KEYS + + ", current is " + region.size(); + } + }, MAX_WAIT, 100, true); + assertEquals(NUM_KEYS, region.size()); + } + }); + + } + + private AsyncInvocation shutDownAllMembers(VM vm, final int expnum, final long timeout) { + AsyncInvocation future = vm.invokeAsync(new SerializableRunnable("Shutdown all the members") { + + public void run() { + DistributedSystemConfig config; + AdminDistributedSystemImpl adminDS = null; + try { + config = AdminDistributedSystemFactory.defineDistributedSystem(cache + .getDistributedSystem(), ""); + adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory + .getDistributedSystem(config); + adminDS.connect(); + Set members = adminDS.shutDownAllMembers(timeout); + int num = members == null ? 0 : members.size(); + assertEquals(expnum, num); + } + catch (AdminException e) { + throw new RuntimeException(e); + } + finally { + if (adminDS != null) { + adminDS.disconnect(); + } + } + } + }); + return future; + } + +}