http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java
new file mode 100644
index 0000000..1c073f9
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ReplicatedRegion_ParallelWANPropogationDUnitTest.java
@@ -0,0 +1,1124 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.distributed.internal.ReplyException;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.PrimaryBucketException;
+import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+import dunit.DistributedTestCase;
+import dunit.DistributedTestCase.ExpectedException;
+import dunit.DistributedTestCase.WaitCriterion;
+
+public class ReplicatedRegion_ParallelWANPropogationDUnitTest extends 
WANTestBase{
+
+  public ReplicatedRegion_ParallelWANPropogationDUnitTest(String name) {
+    super(name);
+    // TODO Auto-generated constructor stub
+  }
+
+  final String expectedExceptions = null;
+  
+  
+  /**
+   * 
+   */
+  private static final long serialVersionUID = 1L;
+
+  public void test_DR_PGS_1Nodes_Put_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", null, isOffHeap() });
+
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap() });
+
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+          true, 10, 100, false, false, null, true });
+
+      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
+      fail("Expected GatewaySenderConfigException where parallel gateway 
sender can not be used with replicated region");
+    }
+    catch (Exception e) {
+      if (!e.getCause().getMessage()
+          .contains("can not be used with replicated region")) {
+        fail("Expected GatewaySenderConfigException where parallel gateway 
sender can not be used with replicated region");
+      }
+    }
+  }
+  
+  /*1. Validate that parallelGAtewaySenderId can be added to distributed region
+   *Region distributed ack/noack + PGS
+   *1. Find out the restrictions on totalNumBuckets on shadowPR
+   *2. Find out the restrictions on redundancy on shadowPR
+   *3. Find out the restrictions on localMaxMemory on shadowPR
+   *4. Find out the best way user will specify PR attributes to PGS
+   *5. Find out the restrictions on ordering.
+   *6. put on region populates the queue    
+   *7. put on region reaches to remote site. Dispatcher works as expected
+   *8. m1 and m2 has DR(ack/noack). put on DR from m1 populates queue on both 
m1 and m2. Validate that remote site got all the events
+   *9. m1 and m2 has DR(ack/noack). create/put/destroy/operations populates 
the queue. Validate that remote site got correct events
+   *10. m1 and m2 has DR(ack/noack). localDestory is called on m1's DR. This 
locally destroys M1's shadowPr
+   *11. m1 and m2 has DR(ack/noack). destory is called on m1's DR. This 
destroys entire shadowPr on m1 and m2
+   *12. m1 and m2 has DR(ack/noack). close Region is called on m1's DR. This 
locally destroys shadowPr on m1
+   *13. m1 and m2 has DR(ack/noack). cache.close on m1'. This locally destroys 
shadowPr on m1
+   *14. Validate HA scenario does not cause any event loss
+   *15. PDX events of DR are propagated to remote sites
+   *16. validate stats
+   *17: PR and DR regions with same name.. Can this be created. If yes then 
how to differentiate these 2 different shadowPR. 
+   *18. test for redundancy. FOR SPR's redundancy will be equal to the number 
of nodes where DR is present. Max is 3. I know this needs to be figure it out 
at runtime. 
+   *19. test without prviding diskstorename..I suspect some problem with this 
code. diskStoreName=null looks like this is not handled very well. need to 
verify
+   *20. PAralleGatewaySenderQueue#addPR method has multiple check for 
inPersistenceEnabled. Can's we do it with only one check.  
+  */
+  
+  /**
+   * Test to validate that created parallel gatewaySenderls id can be added to
+   * distributed region
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed 
to old style
+    2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  public void DISABLED_test_PGS_Started_DR_CREATED_NO_RECEIVER() throws 
Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+/*      ExpectedException exp1 = 
addExpectedException(GatewaySenderException.class
+          .getName(), vm4);
+      ExpectedException exp2 = addExpectedException(InterruptedException.class
+          .getName(), vm4);
+      try {
+*/        vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 
2,
+            true, 10, 100, false, false, null, false });
+        vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+            testName + "_RR", "ln1", isOffHeap()  });
+        vm4.invoke(WANTestBase.class, "doPuts", new Object[] {
+            testName + "_RR", 1000 });
+        vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] {
+            "ln1", 1000 });
+
+/*      }
+      finally {
+        exp1.remove();
+        exp2.remove();
+      }
+*/    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+
+  /**
+   * Test to validate that distributed region with given paralleGAtewaySender 
id
+   * is created first and then a same parallelGatewaySender is created
+   * a single put in DR is enqueued in parallelQueue
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed 
to old style
+    2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  public void DISABLED_test_DR_CREATED_PGS_STARTED_NO_RECEIVER() throws 
Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln1", isOffHeap()  });
+/*      ExpectedException exp1 = 
addExpectedException(GatewaySenderException.class
+          .getName(), vm4);
+      ExpectedException exp2 = addExpectedException(InterruptedException.class
+          .getName(), vm4);
+      try {*/
+        vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+            true, 10, 100, false, false, null, false });
+        vm4.invoke(WANTestBase.class, "doPuts", new Object[] {
+            testName + "_RR", 1000 });
+        vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] {
+            "ln1", 1000 });
+/*      }
+      finally {
+        exp1.remove();
+        exp2.remove();
+      }
+*/    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PGS_1Node_Put_ValidateQueue_No_Receiver() 
throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap()  });
+      
+/*      ExpectedException exp1 = 
addExpectedException(GatewaySenderException.class
+          .getName(), vm4);
+      ExpectedException exp2 = addExpectedException(InterruptedException.class
+          .getName(), vm4);
+      try {*/
+        vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+            true, 10, 100, false, false, null, true });
+        vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
+
+        vm4.invoke(WANTestBase.class, "doPuts", new Object[] {
+            testName + "_RR", 10000 });
+
+        vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_RR", 10000 });
+        vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] {
+            "ln1", 10000 });
+/*      }
+    finally {
+      exp1.remove();
+      exp2.remove();
+    }
+     */ 
+    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PGS_2Nodes_Put_ValidateQueue_No_Receiver() 
throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap()  });
+      vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln1", isOffHeap()  });
+      
+/*      ExpectedException exp1 = addExpectedException(
+          GatewaySenderException.class.getName());
+      ExpectedException exp2 = addExpectedException(
+          InterruptedException.class.getName());
+      ExpectedException exp3 = addExpectedException(
+          CacheClosedException.class.getName());
+      try {
+*/        vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 
2,
+            true, 10, 100, false, false, null, true });
+        vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+            true, 10, 100, false, false, null, true });
+
+        vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
+        vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
+
+        vm4.invoke(WANTestBase.class, "doPuts", new Object[] {
+            testName + "_RR", 1000 });
+
+        vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_RR", 1000 });
+        vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_RR", 1000 });
+
+        vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] {
+            "ln1", 1000 });
+        vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] {
+            "ln1", 1000 });
+
+/*      }
+      finally {
+        exp1.remove();
+        exp2.remove();
+        exp3.remove();
+      }
+*/      
+    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+  
+//  public void test_DR_PGS_ORDERPOLICY_PARTITION_EXPECTException(){
+//    
+//  }
+//  public void test_DR_PGS_DISKSTORE_NAME_PROVIDED_VALIDATE_DISK(){
+//    
+//  }
+//  public void test_DR_PGS_DISKSTORE_NAME_NOT_PROVIDED_VALIDATE_DISK(){
+//    
+//  }
+//  
+//  public void test_DR_PGS_START_STOP_START(){
+//    
+//  }
+//
+//  public void test_DR_PGS_PERSISTENCE_START_STOP_START(){
+//    
+//  }
+//  
+//  public void test_DR_PGS_START_PAUSE_STOP(){
+//    
+//  }
+//
+//  public void test_DR_PGS_START_PAUSE_RESUME_VALIDATE_RECEIVER(){
+//    
+//  }
+
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PGS_1Nodes_Put_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+        true, 10, 100, false, false, null, true});
+
+      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"});
+      
+      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000});
+     
+      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000});
+      
+      vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      
+      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000});
+    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PGS_2Nodes_Put_Receiver() throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap()  });
+      vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln1", isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+        true, 10, 100, false, false, null, true});
+      vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+      true, 10, 100, false, false, null, true});
+
+      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"});
+      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"});
+      
+      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+     
+      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      
+      vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      
+      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PGS_2Nodes_EMPTY_Put_Receiver() throws 
Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.EMPTY, 
isOffHeap()  });
+      vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln1", Scope.DISTRIBUTED_ACK, DataPolicy.REPLICATE, 
isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+        true, 10, 100, false, false, null, true});
+      vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+      true, 10, 100, false, false, null, true});
+
+      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"});
+      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"});
+      
+      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+     
+//      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+//        1000 });
+      vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      
+      vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      
+      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PR_PGS_4Nodes_Put_Receiver_2Nodes() throws 
Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      
+      vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+      vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+      
+      vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap()  });
+      vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      
+      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+          testName + "_PR", "ln", 1, 100, isOffHeap()  });
+      vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+          testName + "_PR", "ln", 1, 100, isOffHeap()  });
+      vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+          testName + "_PR", "ln", 1, 100, isOffHeap()  });
+      vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+          testName + "_PR", "ln", 1, 100, isOffHeap()  });
+
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 10, 100, false, false, null, true });
+      vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 10, 100, false, false, null, true });
+      vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 10, 100, false, false, null, true });
+      vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 10, 100, false, false, null, true });
+
+      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln"});
+      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln"});
+      vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln"});
+      vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln"});
+      
+      vm4.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm5.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm6.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm7.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+
+      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+      vm5.invoke(WANTestBase.class, "doNextPuts", new Object[] { testName + 
"_PR",
+        1000, 2000 });
+      
+      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_PR",
+        1000 });
+      vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      
+      vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln",
+        0 });
+      vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln",
+        0 });
+      
+/*      ExpectedException exp1 = 
addExpectedException(CacheClosedException.class
+          .getName());
+      try {*/
+        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_RR", 1000 });
+        vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_RR", 1000 });
+        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_PR", 1000 });
+        vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_PR", 1000 });
+/*      }
+      finally {
+        exp1.remove();
+      }
+*/    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PGS_NOMANUALSTART_4Nodes_Put_ValidateReceiver() 
throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+          true, 10, 100, false, false, null, false });
+      vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+          true, 10, 100, false, false, null, false });
+      vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+          true, 10, 100, false, false, null, false });
+      vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+          true, 10, 100, false, false, null, false });
+    
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap()  });
+      vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap()  });
+      vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap()  });
+      vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln1", isOffHeap()  });
+
+      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+     
+      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+
+      
+      vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      vm6.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      vm7.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      
+/*      ExpectedException exp1 = 
addExpectedException(CacheClosedException.class
+          .getName());
+      try {*/
+        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_RR", 1000 });
+/*      }
+      finally {
+        exp1.remove();
+      }*/
+    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void 
DISABLED_test_DR_PGS_4Nodes_Put_CLOSE4NODESCACHE_RECREATE_PUT_ValidateReceiver()
+      throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 100, 10, false, false, null, true });
+      vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 100, 10, false, false, null, true });
+      vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 100, 10, false, false, null, true });
+      vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 100, 10, false, false, null, true });
+
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+
+      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+      vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+      vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+      vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", null, isOffHeap()  });
+      vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", null, isOffHeap()  });
+
+      // before doing any puts, let the senders be running in order to ensure
+      // that
+      // not a single event will be lost
+      vm4.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm5.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm6.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm7.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+
+      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+          1000 });
+      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+          testName + "_RR", 1000 });
+
+/*      ExpectedException exp1 = 
addExpectedException(CacheClosedException.class
+          .getName());
+      try {*/
+        vm4.invoke(WANTestBase.class, "killSender", new Object[] {});
+        vm5.invoke(WANTestBase.class, "killSender", new Object[] {});
+        vm6.invoke(WANTestBase.class, "killSender", new Object[] {});
+        vm7.invoke(WANTestBase.class, "killSender", new Object[] {});
+/*      }
+      finally {
+        exp1.remove();
+      }*/
+
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 100, 10, false, false, null, true });
+      vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 100, 10, false, false, null, true });
+      vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 100, 10, false, false, null, true });
+      vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+          true, 100, 10, false, false, null, true });
+
+      vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+      vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+          testName + "_RR", "ln", isOffHeap()  });
+
+      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+      vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+      vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+      vm4.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm5.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm6.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      vm7.invoke(WANTestBase.class, "waitForSenderRunningState",
+          new Object[] { "ln" });
+      // 
------------------------------------------------------------------------------------
+
+      vm4.invoke(WANTestBase.class, "doNextPuts", new Object[] {
+          testName + "_RR", 1000, 2000 });
+
+      // verify all buckets drained on all sender nodes.
+      vm4.invoke(WANTestBase.class,
+          "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" 
});
+      vm5.invoke(WANTestBase.class,
+          "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" 
});
+      vm6.invoke(WANTestBase.class,
+          "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" 
});
+      vm7.invoke(WANTestBase.class,
+          "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" 
});
+
+/*      exp1 = addExpectedException(CacheClosedException.class.getName());
+      try {*/
+        vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_RR", 2000 });
+        vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+            testName + "_RR", 2000 });
+/*      }
+      finally {
+        exp1.remove();
+      }*/
+    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_NO_ACK_PGS_2Nodes_Put_ValidateQueue_Receiver() 
throws Exception {
+    try {
+      Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+          "createFirstLocatorWithDSId", new Object[] { 1 });
+      Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+          "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+      vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+      vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap()  });
+      
+      vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+      
+      vm4.invoke(WANTestBase.class,
+          "createReplicatedRegion", new Object[] { testName + "_RR", "ln1",
+              Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap()  });
+      vm5.invoke(WANTestBase.class,
+          "createReplicatedRegion", new Object[] { testName + "_RR", "ln1",
+              Scope.DISTRIBUTED_NO_ACK, DataPolicy.REPLICATE, isOffHeap()   });
+      
+      vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+        true, 10, 100, false, false, null, true});
+      vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+      true, 10, 100, false, false, null, true});
+
+      vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"});
+      vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln1"});
+      
+      vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+     
+      vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] { 
testName + "_RR",
+        1000 });
+      
+      vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      vm5.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln1",
+        0 });
+      
+      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    }
+    catch (Exception e) {
+      fail("Unexpected exception", e);
+    }
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PGS_2NODES_1NODESDOWN_Validate_Receiver() 
throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+        100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+        100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState",
+        new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState",
+        new Object[] { "ln" });
+
+    pauseWaitCriteria(60000);
+    
+/*    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try {*/
+      AsyncInvocation inv1 = vm4.invokeAsync(
+          ReplicatedRegion_ParallelWANPropogationDUnitTest.class, "doPuts0", 
new Object[] {
+              testName + "_RR", 1000 });
+      pause(1000);
+      AsyncInvocation inv2 = vm5.invokeAsync(WANTestBase.class, "killSender");
+      try {
+        inv1.join();
+        inv2.join();
+      }
+      catch (Exception e) {
+        fail("UnExpected Exception", e);
+      }
+/*    }
+    finally {
+      exp1.remove();
+    }*/
+
+    Integer size = (Integer)vm4.invoke(WANTestBase.class,
+        "getQueueContentSize", new Object[] { "ln" });
+    getLogWriter().info("The size of the queue is in vm4 " + size);
+    
+
+    vm4.invoke(WANTestBase.class,
+        "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+    
+    size = (Integer)vm4.invoke(WANTestBase.class,
+        "getQueueContentSize", new Object[] { "ln" });
+    getLogWriter().info("The size of the queue is in vm4 " + size);
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+  
+  /**Below test is disabled intentionally
+  1> In this release 8.0, for rolling upgrade support queue name is changed to 
old style
+  2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+  3> We have to enabled it in next release
+  4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0*/
+  public void DISABLED_test_DR_PGS_4NODES_2NODESDOWN_Validate_Receiver() 
throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+        100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+        100, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+        100, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+        100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState",
+        new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState",
+        new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState",
+        new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState",
+        new Object[] { "ln" });
+
+    pauseWaitCriteria(60000);
+/*    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try */{
+      AsyncInvocation inv1 = vm7.invokeAsync(
+          ReplicatedRegion_ParallelWANPropogationDUnitTest.class, "doPuts0", 
new Object[] {
+              testName + "_RR", 10000 });
+      pauseWaitCriteria(1000);
+      AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "killSender");
+      pauseWaitCriteria(2000);
+      AsyncInvocation inv3 = vm6.invokeAsync(
+          ReplicatedRegion_ParallelWANPropogationDUnitTest.class, "doPuts1", 
new Object[] {
+              testName + "_RR", 10000 });
+      pauseWaitCriteria(1500);
+      AsyncInvocation inv4 = vm5.invokeAsync(WANTestBase.class, "killSender");
+      try {
+        inv1.join();
+        inv2.join();
+        inv3.join();
+        inv4.join();
+      }
+      catch (Exception e) {
+        fail("UnExpected Exception", e);
+      }
+    }/*
+    finally {
+      exp1.remove();
+    }*/
+
+    vm6.invoke(WANTestBase.class,
+        "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class,
+        "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 10000 });
+    
+  }
+  
+  public static void doPuts0(String regionName, int numPuts) {
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try {
+
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        getLogWriter().info("Put : key : " + i);
+        r.put(i, "0_" + i);
+      }
+    } finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+  
+  public static void doPuts1(String regionName, int numPuts){
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try {
+
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        getLogWriter().info("Put : key : " + i);
+        r.put(i, "1_" + i);
+      }
+    } finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+  
+  public static void doPuts2(String regionName, int numPuts){
+    ExpectedException exp = addExpectedException(ForceReattemptException.class
+        .getName());
+    ExpectedException exp1 = addExpectedException(CacheClosedException.class
+        .getName());
+    try {
+      Region r = cache.getRegion(Region.SEPARATOR + regionName);
+      assertNotNull(r);
+      for (long i = 0; i < numPuts; i++) {
+        getLogWriter().info("Put : key : " + i);
+        r.put(i, "2_" + i);
+      }
+    } finally {
+      exp.remove();
+      exp1.remove();
+    }
+  }
+    
+  /**
+   * Test to validate that put on DR with no ack on multiple nodes are 
propogated to parallelqueue on multiple nodes 
+   */
+  
+  /**
+   * Test to validate that the single put in DR is propoagted to remote site 
through paralleHatewaySender
+   */
+  
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
new file mode 100644
index 0000000..4e15ab7
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/SenderWithTransportFilterDUnitTest.java
@@ -0,0 +1,266 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Properties;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
+import java.util.zip.CheckedOutputStream;
+
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.DiskStoreFactory;
+import com.gemstone.gemfire.cache.wan.GatewayReceiver;
+import com.gemstone.gemfire.cache.wan.GatewayReceiverFactory;
+import com.gemstone.gemfire.cache.wan.GatewaySenderFactory;
+import com.gemstone.gemfire.cache.wan.GatewayTransportFilter;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.wan.CompressionInputStream;
+import com.gemstone.gemfire.internal.cache.wan.CompressionOutputStream;
+import com.gemstone.gemfire.internal.cache.wan.InternalGatewaySenderFactory;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyLocatorCallback;
+
+import dunit.VM;
+
+public class SenderWithTransportFilterDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public SenderWithTransportFilterDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public void testSerialSenderWithTansportFilter() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(SenderWithTransportFilterDUnitTest.class,
+        "createReceiverWithTransportFilters", new Object[] { nyPort });
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(SenderWithTransportFilterDUnitTest.class,
+        "createSenderWithTransportFilter", new Object[] { "ln", 2, false, 100,
+            1, false, false, true });
+
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_RR", 100 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 100 });
+  }
+
+  public void testParallelSenderWithTansportFilter() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(SenderWithTransportFilterDUnitTest.class,
+        "createReceiverWithTransportFilters", new Object[] { nyPort });
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 10, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(SenderWithTransportFilterDUnitTest.class,
+        "createSenderWithTransportFilter", new Object[] { "ln", 2, true, 100,
+            1, false, false, true });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 0, 10, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_PR", 100 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 100 });
+  }
+  
+  public static int createReceiverWithTransportFilters(int locPort) {
+    WANTestBase test = new WANTestBase(testName);
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "localhost[" + locPort
+        + "]");
+
+    InternalDistributedSystem ds = test.getSystem(props);
+    cache = CacheFactory.create(ds);
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    ArrayList<GatewayTransportFilter> transportFilters = new 
ArrayList<GatewayTransportFilter>();
+    transportFilters.add(new CheckSumTranportFilter("CheckSumTranportFilter"));
+    transportFilters.add(new 
CompressionTranportFilter("CompressionTranportFilter"));
+    if (!transportFilters.isEmpty()) {
+      for (GatewayTransportFilter filter : transportFilters) {
+        fact.addGatewayTransportFilter(filter);
+      }
+    }
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    }
+    catch (IOException e) {
+      e.printStackTrace();
+      fail("Test " + test.getName()
+          + " failed to start GatewayRecevier on port " + port);
+    }
+    return port;
+  }
+
+  public static void createSenderWithTransportFilter(String dsName,
+      int remoteDsId, boolean isParallel, Integer maxMemory, Integer batchSize,
+      boolean isConflation, boolean isPersistent, boolean isManulaStart) {
+    File persistentDirectory = new File(dsName + "_disk_"
+        + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
+    persistentDirectory.mkdir();
+    DiskStoreFactory dsf = cache.createDiskStoreFactory();
+    File[] dirs1 = new File[] { persistentDirectory };
+
+    if (isParallel) {
+      GatewaySenderFactory gateway = cache
+          .createGatewaySenderFactory();
+      gateway.setParallel(true);
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      ((InternalGatewaySenderFactory)gateway).setLocatorDiscoveryCallback(new 
MyLocatorCallback());
+      ArrayList<GatewayTransportFilter> transportFilters = new 
ArrayList<GatewayTransportFilter>();
+      transportFilters.add(new 
CompressionTranportFilter("CompressionTranportFilter"));
+      transportFilters.add(new 
CheckSumTranportFilter("CheckSumTranportFilter"));
+      if (!transportFilters.isEmpty()) {
+        for (GatewayTransportFilter filter : transportFilters) {
+          gateway.addGatewayTransportFilter(filter);
+        }
+      }
+      if (isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+            .getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      gateway.create(dsName, remoteDsId);
+
+    }
+    else {
+      GatewaySenderFactory gateway = cache
+          .createGatewaySenderFactory();
+      gateway.setMaximumQueueMemory(maxMemory);
+      gateway.setBatchSize(batchSize);
+      gateway.setManualStart(isManulaStart);
+      ((InternalGatewaySenderFactory)gateway)
+          .setLocatorDiscoveryCallback(new MyLocatorCallback());
+      ArrayList<GatewayTransportFilter> transportFilters = new 
ArrayList<GatewayTransportFilter>();
+      transportFilters.add(new 
CompressionTranportFilter("CompressionTranportFilter"));
+      transportFilters.add(new 
CheckSumTranportFilter("CheckSumTranportFilter"));
+      if (!transportFilters.isEmpty()) {
+        for (GatewayTransportFilter filter : transportFilters) {
+          gateway.addGatewayTransportFilter(filter);
+        }
+      }
+      gateway.setBatchConflationEnabled(isConflation);
+      if (isPersistent) {
+        gateway.setPersistenceEnabled(true);
+        gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName)
+            .getName());
+      }
+      else {
+        DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
+        gateway.setDiskStoreName(store.getName());
+      }
+      gateway.create(dsName, remoteDsId);
+    }
+  }
+
+  static class CompressionTranportFilter implements GatewayTransportFilter {
+
+    private String name;
+    
+    public CompressionTranportFilter(String name){
+      this.name = name;
+    }
+    
+    public String toString(){
+      return this.name;
+    }
+    public InputStream getInputStream(InputStream stream) {
+      return new CompressionInputStream(stream);
+      // return new ZipInputStream(stream);
+    }
+
+    public OutputStream getOutputStream(OutputStream stream) {
+      return new CompressionOutputStream(stream);
+      // return new ZipOutputStream(stream);
+    }
+
+    public void close() {
+      // TODO Auto-generated method stub
+    }
+
+  }
+  
+  static class CheckSumTranportFilter implements GatewayTransportFilter {
+
+    Adler32 checker = new Adler32();
+    
+    private String name;
+    
+    public CheckSumTranportFilter(String name){
+      this.name = name;
+    }
+    
+    public String toString(){
+      return this.name;
+    }
+    public InputStream getInputStream(InputStream stream) {
+      return new CheckedInputStream(stream, checker);
+      // return new ZipInputStream(stream);
+    }
+
+    public OutputStream getOutputStream(OutputStream stream) {
+      return new CheckedOutputStream(stream, checker);
+      // return new ZipOutputStream(stream);
+    }
+
+    public void close() {
+      // TODO Auto-generated method stub
+    }
+
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..211f726
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/ShutdownAllPersistentGatewaySenderDUnitTest.java
@@ -0,0 +1,197 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import java.util.Set;
+
+import com.gemstone.gemfire.admin.AdminDistributedSystemFactory;
+import com.gemstone.gemfire.admin.AdminException;
+import com.gemstone.gemfire.admin.DistributedSystemConfig;
+import com.gemstone.gemfire.admin.internal.AdminDistributedSystemImpl;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.internal.cache.CacheObserverAdapter;
+import com.gemstone.gemfire.internal.cache.CacheObserverHolder;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+import dunit.DistributedTestCase;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+public class ShutdownAllPersistentGatewaySenderDUnitTest extends WANTestBase {
+  private static final long MAX_WAIT = 70000;
+
+  private static final int NUM_KEYS = 1000;
+
+  public ShutdownAllPersistentGatewaySenderDUnitTest(String name) {
+    super(name);
+  }
+  
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    addExpectedException("Cache is being closed by ShutdownAll");
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  public void testGatewaySender() throws Exception {
+    addExpectedException("Cache is shutting down");
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createCache", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { nyPort });
+    vm2.invoke(WANTestBase.class, "createReceiverAfterCache",
+        new Object[] { nyPort });
+
+    vm2.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+        new Object[] { testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+        new Object[] { testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 400, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+        new Object[] { testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    // set the CacheObserver to block the ShutdownAll
+    SerializableRunnable waitAtShutdownAll = new SerializableRunnable() {
+      @Override
+      public void run() {
+        LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER = true;
+        CacheObserverHolder.setInstance(new CacheObserverAdapter() {
+          @Override
+          public void beforeShutdownAll() {
+            final Region region = cache.getRegion(testName + "_PR");
+            waitForCriterion(new WaitCriterion() {
+              @Override
+              public boolean done() {
+                return region.size() >= 2;
+              }
+
+              @Override
+              public String description() {
+                return "Wait for wan to have processed several events";
+              }
+            }, 30000, 100, true);
+          }
+        });
+      }
+    };
+    vm2.invoke(waitAtShutdownAll);
+    vm3.invoke(waitAtShutdownAll);
+    
+    AsyncInvocation vm4_future = vm4.invokeAsync(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_PR", NUM_KEYS });
+
+    // ShutdownAll will be suspended at observer, so puts will continue
+    AsyncInvocation future = shutDownAllMembers(vm2, 2, MAX_WAIT);
+    future.join(MAX_WAIT);
+
+    // now restart vm1 with gatewayHub
+    getLogWriter().info("restart in VM2");
+    vm2.invoke(WANTestBase.class, "createCache", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { nyPort });
+    AsyncInvocation vm3_future = vm3.invokeAsync(WANTestBase.class,
+        "createPersistentPartitionedRegion", new Object[] { testName + "_PR",
+            "ln", 1, 100, isOffHeap() });
+    vm2.invoke(WANTestBase.class, "createPersistentPartitionedRegion",
+        new Object[] { testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm3_future.join(MAX_WAIT);
+
+    vm3.invoke(new SerializableRunnable() {
+      public void run() {
+        final Region region = cache.getRegion(testName + "_PR");
+        cache.getLogger().info(
+            "vm1's region size before restart gatewayhub is " + region.size());
+      }
+    });
+    vm2.invoke(WANTestBase.class, "createReceiverAfterCache",
+        new Object[] { nyPort });
+
+    // wait for vm0 to finish its work
+    vm4_future.join(MAX_WAIT);
+    vm4.invoke(new SerializableRunnable() {
+      public void run() {
+        Region region = cache.getRegion(testName + "_PR");
+        assertEquals(NUM_KEYS, region.size());
+      }
+    });
+
+    // verify the other side (vm1)'s entries received from gateway
+    vm2.invoke(new SerializableRunnable() {
+      public void run() {
+        final Region region = cache.getRegion(testName + "_PR");
+
+        cache.getLogger().info(
+            "vm1's region size after restart gatewayhub is " + region.size());
+        waitForCriterion(new WaitCriterion() {
+          public boolean done() {
+            Object lastvalue = region.get(NUM_KEYS - 1);
+            if (lastvalue != null && lastvalue.equals(NUM_KEYS - 1)) {
+              region.getCache().getLogger().info(
+                  "Last key has arrived, its value is " + lastvalue
+                      + ", end of wait.");
+              return true;
+            }
+            else
+              return (region.size() == NUM_KEYS);
+          }
+
+          public String description() {
+            return "Waiting for destination region to reach size: " + NUM_KEYS
+                + ", current is " + region.size();
+          }
+        }, MAX_WAIT, 100, true);
+        assertEquals(NUM_KEYS, region.size());
+      }
+    });
+
+  }
+
+  private AsyncInvocation shutDownAllMembers(VM vm, final int expnum, final 
long timeout) {
+      AsyncInvocation future = vm.invokeAsync(new 
SerializableRunnable("Shutdown all the members") {
+
+      public void run() {
+        DistributedSystemConfig config;
+        AdminDistributedSystemImpl adminDS = null;
+        try {
+          config = AdminDistributedSystemFactory.defineDistributedSystem(cache
+              .getDistributedSystem(), "");
+          adminDS = (AdminDistributedSystemImpl)AdminDistributedSystemFactory
+              .getDistributedSystem(config);
+          adminDS.connect();
+          Set members = adminDS.shutDownAllMembers(timeout);
+          int num = members == null ? 0 : members.size();
+          assertEquals(expnum, num);
+        }
+        catch (AdminException e) {
+          throw new RuntimeException(e);
+        }
+        finally {
+          if (adminDS != null) {
+            adminDS.disconnect();
+          }
+        }
+      }
+    });
+    return future;
+  }
+
+}

Reply via email to