http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java
new file mode 100644
index 0000000..447f5ea
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_1_DUnitTest.java
@@ -0,0 +1,603 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.internal.cache.wan.BatchException70;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+
+/**
+ * All the test cases are similar to SerialWANPropogationDUnitTest except that
+ * the we create concurrent serial GatewaySender with concurrency of 4
+ * @author skumar
+ *
+ */
+public class ConcurrentWANPropogation_1_DUnitTest extends WANTestBase {
+
+  /**
+   * @param name
+   */
+  public ConcurrentWANPropogation_1_DUnitTest(String name) {
+    super(name);
+  }
+
+  private static final long serialVersionUID = 1L;
+  
+  /**
+   * All the test cases are similar to SerialWANPropogationDUnitTest
+   * @throws Exception
+   */
+  public void testReplicatedSerialPropagation_withoutRemoteSite() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    //keep the batch size high enough to reduce the number of exceptions in 
the log
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 400, false, false, null, true, 4, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    addExpectedException(BatchException70.class.getName());
+    addExpectedException(ServerOperationException.class.getName());
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+      1000 });
+    
+    vm2.invoke(WANTestBase.class, "createCache", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { nyPort });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", null, isOffHeap() });
+  
+    vm2.invoke(WANTestBase.class, "createReceiver2",
+        new Object[] {nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver2",
+        new Object[] {nyPort });
+    
+    vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+  
+  public void testReplicatedSerialPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+  
+  
+  public void testReplicatedSerialPropagationWithLocalSiteClosedAndRebuilt() 
throws Exception {
+    addExpectedException("Broken pipe");
+    addExpectedException("Connection reset");
+    addExpectedException("Unexpected IOException");
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+    
+    //---------close local site and build 
again-----------------------------------------
+    vm4.invoke(WANTestBase.class, "killSender", new Object[] { });
+    vm5.invoke(WANTestBase.class, "killSender", new Object[] { });
+    vm6.invoke(WANTestBase.class, "killSender", new Object[] { });
+    vm7.invoke(WANTestBase.class, "killSender", new Object[] { });
+    
+    Integer regionSize = 
+      (Integer) vm2.invoke(WANTestBase.class, "getRegionSize", new Object[] 
{testName + "_RR" });
+    getLogWriter().info("Region size on remote is: " + regionSize);
+    
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+      false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+      false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+  
+    vm4.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new 
Object[] { "ln", true });
+    vm5.invoke(WANTestBase.class, "setRemoveFromQueueOnException", new 
Object[] { "ln", true });
+    
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", "ln", isOffHeap() });
+    
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    addExpectedException(EntryExistsException.class.getName());
+    addExpectedException(BatchException70.class.getName());
+    addExpectedException(ServerOperationException.class.getName());
+    
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+      1000 });
+    
//----------------------------------------------------------------------------------
+
+    //verify remote site receives all the events
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+  
+  /**
+   * Two regions configured with the same sender and put is in progress 
+   * on both the regions.
+   * One of the two regions is destroyed in the middle.
+   * 
+   * @throws Exception
+   */
+  public void testReplicatedSerialPropagationWithLocalRegionDestroy() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    //these are part of remote site
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    //these are part of local site
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    //senders are created on local site
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 20, false, false, null, true, 3, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 20, false, false, null, true ,3, OrderPolicy.THREAD});
+
+    //create one RR (RR_1) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap() });
+
+    //create another RR (RR_2) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", null, isOffHeap() });
+    
+    //start the senders on local site
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+
+    //create another RR (RR_2) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    
+    //start puts in RR_1 in another thread
+    AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_RR_1", 1000 });
+    //do puts in RR_2 in main thread
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_2", 
500 });
+    //destroy RR_2 after above puts are complete
+    vm4.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName + 
"_RR_2"});
+    
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    //sleep for some time to let all the events propagate to remote site
+    Thread.sleep(20);
+    //vm4.invoke(WANTestBase.class, "verifyQueueSize", new Object[] { "ln", 0 
});
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR_1", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR_2", 500 });
+  }
+
+  /**
+   * 1 region and sender configured on local site and 1 region and a 
+   * receiver configured on remote site. Puts to the local region are in 
progress.
+   * Remote region is destroyed in the middle.
+   * 
+   * @throws Exception
+   */
+  public void testReplicatedSerialPropagationWithRemoteRegionDestroy() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    //these are part of remote site
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    //these are part of local site
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    //senders are created on local site
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 500, false, false, null, true, 5, OrderPolicy.KEY });
+
+    //create one RR (RR_1) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap() });
+
+    //start the senders on local site
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+
+    addExpectedException(BatchException70.class.getName());
+    addExpectedException(ServerOperationException.class.getName());
+    
+    //start puts in RR_1 in another thread
+    AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_RR_1", 10000 });
+    //destroy RR_1 in remote site
+    vm2.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName + 
"_RR_1"});
+    
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+
+    //verify that all is well in local site. All the events should be present 
in local region
+    vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR_1", 10000 });
+    //assuming some events might have been dispatched before the remote region 
was destroyed,
+    //sender's region queue will have events less than 1000 but the queue will 
not be empty.
+    //NOTE: this much verification might be sufficient in DUnit. Hydra will 
take care of 
+    //more in depth validations.
+    vm4.invoke(WANTestBase.class, 
"verifyRegionQueueNotEmptyForConcurrentSender", new Object[] {"ln" });
+  }
+  
+  /**
+   * Two regions configured in local with the same sender and put is in 
progress 
+   * on both the regions. Same two regions are configured on remote site as 
well.
+   * One of the two regions is destroyed in the middle on remote site.
+   * 
+   * @throws Exception
+   */
+  public void testReplicatedSerialPropagationWithRemoteRegionDestroy2() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    //these are part of remote site
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    //these are part of local site
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    //senders are created on local site
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD });
+
+    //create one RR (RR_1) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap() });
+
+    //create another RR (RR_2) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", null, isOffHeap() });
+    
+    //start the senders on local site
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    //create one RR (RR_1) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+
+    //create another RR (RR_2) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    //destroy RR_2 on remote site in the middle
+    vm2.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName + 
"_RR_2"});
+    
+    //expected exceptions in the logs
+    addExpectedException(BatchException70.class.getName());
+    addExpectedException(ServerOperationException.class.getName());
+    
+    //start puts in RR_2 in another thread
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR_2", 
1000 });
+    
+    //start puts in RR_1 in another thread
+    AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_RR_1", 1000 });
+   
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    //though region RR_2 is destroyed, RR_1 should still get all the events 
put in it 
+    //in local site
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR_1", 1000 });
+
+  }
+
+  public void testReplicatedSerialPropagationWithRemoteRegionDestroy3()
+      throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    // these are part of remote site
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    // these are part of local site
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    // senders are created on local site
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 200, false, false, null, true, 5, OrderPolicy.THREAD });
+
+    // create one RR (RR_1) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", null, isOffHeap() });
+
+    // create another RR (RR_2) on remote site
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", null, isOffHeap() });
+
+    // start the senders on local site
+    vm4.invoke(WANTestBase.class, "setRemoveFromQueueOnException",
+        new Object[] { "ln", true });
+    vm5.invoke(WANTestBase.class, "setRemoveFromQueueOnException",
+        new Object[] { "ln", true });
+
+    // start the senders on local site
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    // create one RR (RR_1) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_1", "ln", isOffHeap() });
+
+    // create another RR (RR_2) on local site
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR_2", "ln", isOffHeap() });
+
+    addExpectedException(BatchException70.class.getName());
+    addExpectedException(ServerOperationException.class.getName());
+
+    // start puts in RR_1 in another thread
+    AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_RR_1", 1000 });
+    // start puts in RR_2 in another thread
+    AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_RR_2", 1000 });
+    // destroy RR_2 on remote site in the middle
+    vm2.invoke(WANTestBase.class, "destroyRegion", new Object[] { testName
+        + "_RR_2" });
+
+    try {
+      inv1.join();
+      inv2.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    // though region RR_2 is destroyed, RR_1 should still get all the events 
put
+    // in it
+    // in local site
+    try {
+      vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+          testName + "_RR_1", 1000 });
+    } finally {
+      System.setProperty(
+          "gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False");
+      vm4.invoke(new CacheSerializableRunnable("UnSetting system property ") {
+        public void run2() throws CacheException {
+          System.setProperty(
+              "gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False");
+        }
+      });
+
+      vm5.invoke(new CacheSerializableRunnable("UnSetting system property ") {
+        public void run2() throws CacheException {
+          System.setProperty(
+              "gemfire.GatewaySender.REMOVE_FROM_QUEUE_ON_EXCEPTION", "False");
+        }
+      });
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java
new file mode 100644
index 0000000..3f95552
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentWANPropogation_2_DUnitTest.java
@@ -0,0 +1,480 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.EntryExistsException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import 
com.gemstone.gemfire.internal.cache.wan.WANTestBase.MyGatewayEventFilter;
+
+import dunit.AsyncInvocation;
+
+/**
+ * All the test cases are similar to SerialWANPropogationDUnitTest except that
+ * the we create concurrent serial GatewaySender with concurrency of 4
+ * @author skumar
+ *
+ */
+public class ConcurrentWANPropogation_2_DUnitTest extends WANTestBase {
+
+  /**
+   * @param name
+   */
+  public ConcurrentWANPropogation_2_DUnitTest(String name) {
+    super(name);
+  }
+
+  private static final long serialVersionUID = 1L;
+  
+  public void testSerialReplicatedWanWithOverflow() {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, 
"createFirstLocatorWithDSId",
+        new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver",
+        new Object[] {nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver",
+        new Object[] {nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    //keep the maxQueueMemory low enough to trigger eviction
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 50, 5, false, false, null, true, 5, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 50, 5, false, false, null, true, 5, OrderPolicy.KEY });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doHeavyPuts", new Object[] {
+        testName + "_RR", 150 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 150 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 150 });
+  }
+
+  public void Bug46921_testSerialReplicatedWanWithPersistence() {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, true, null, true, 5, OrderPolicy.THREAD });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+
+  }
+
+  public void testReplicatedSerialPropagationToTwoWanSites() throws Exception {
+
+    Integer lnPort = createFirstLocatorWithDSId(1);
+    Integer nyPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer tkPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 3, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"lnSerial1",
+        2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"lnSerial1",
+        2, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"lnSerial2",
+        3, false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"lnSerial2",
+        3, false, 100, 10, false, false, null, true, 5 , OrderPolicy.THREAD});
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial1" });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "lnSerial2" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "lnSerial1,lnSerial2", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+
+  public void testReplicatedSerialPropagationHA() throws Exception {
+    addExpectedException("Broken pipe");
+    addExpectedException("Connection reset");
+    addExpectedException("Unexpected IOException");
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+
+    AsyncInvocation inv1 = vm5.invokeAsync(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_RR", 10000 });
+    pause(2000);
+    AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "killSender");
+    
+    inv1.join();
+    inv2.join();
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 10000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 10000 });
+  }
+
+  public void testReplicatedSerialPropagationWithConflation() throws Exception 
{
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 1000, true, false, null, true, 5, OrderPolicy.THREAD });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+
+  public void testReplicatedSerialPropagationWithParallelThreads()
+      throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] {1});
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] {2,lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 4, OrderPolicy.THREAD });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doMultiThreadedPuts", new Object[] {
+        testName + "_RR", 1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 1000 });
+  }
+
+  public void testSerialPropogationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, 
"createFirstLocatorWithDSId",
+        new Object[] {1});
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] {2,lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName, null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 800 });
+  }
+
+  public void testReplicatedSerialPropagationWithFilter() throws Exception {
+
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] {lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false,
+        new MyGatewayEventFilter(), true, 5, OrderPolicy.THREAD });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, null, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName, "ln", isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName, 1000 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 800 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName, 800 });
+  }
+  
+  public void testNormalRegionSerialPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        false, 100, 10, false, false, null, true, 5, OrderPolicy.THREAD });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    pause(500);
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "createNormalRegion", new Object[] {
+        testName + "_RR", "ln" });
+    vm5.invoke(WANTestBase.class, "createNormalRegion", new Object[] {
+        testName + "_RR", "ln" });
+
+    vm5.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_RR",
+        1000 });
+
+    vm4.invoke(WANTestBase.class, "checkQueueStats", new Object[] { "ln", 0,
+        0, 0, 0});
+
+    vm5.invoke(WANTestBase.class, "checkQueueStats", new Object[] { "ln", 0,
+        1000, 0, 0 });
+    
+    vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName + "_RR", 1000});
+
+    vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName + "_RR", 0});
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_RR", 0 });
+    
+    vm2.invoke(WANTestBase.class, "checkGatewayReceiverStats", new Object[] 
{0, 0, 0});
+    
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java
new file mode 100644
index 0000000..057f89b
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/disttx/DistTXWANDUnitTest.java
@@ -0,0 +1,194 @@
+package com.gemstone.gemfire.internal.cache.wan.disttx;
+
+import com.gemstone.gemfire.CancelException;
+import com.gemstone.gemfire.cache.CacheClosedException;
+import com.gemstone.gemfire.internal.cache.ForceReattemptException;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+import dunit.SerializableCallable;
+
+public class DistTXWANDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+
+  public DistTXWANDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    this.invokeInEveryVM(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        System.setProperty("gemfire.log-level", getDUnitLogLevel());
+        return null;
+      }
+    }); 
+  }
+  
+
+  /*
+   * Disabled because it hangs with current implementation of notifying
+   * adjunct receivers by sending DistTXAdjunctCommitMessage from primary at 
the
+   * time of commit.
+   */
+  public void 
DISABLED_testPartitionedSerialPropagation_SenderSameAsCoordinator() throws 
Exception {
+
+    
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doDistTXPuts", new Object[] { testName + 
"_PR",
+        50 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 50 });
+  }
+
+  public void testPartitionedSerialPropagation_SenderNotSameAsCoordinator() 
throws Exception {
+
+    
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm6.invoke(WANTestBase.class, "doDistTXPuts", new Object[] { testName + 
"_PR",
+        50 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 50 });
+  }
+
+  
+  public void testPartitionedRegionParallelPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap()  });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap()  });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap()  });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap()  });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap()  });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap()  });
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    vm4.invoke(WANTestBase.class, "doDistTXPuts", new Object[] { testName + 
"_PR",
+        5 });
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm6.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm7.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 5 });
+  }
+
+  public void testDummy() {
+  }
+  
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
new file mode 100644
index 0000000..eea4f81
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderDUnitTest.java
@@ -0,0 +1,476 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import com.gemstone.gemfire.cache.wan.GatewaySender;
+import com.gemstone.gemfire.internal.cache.BucketRegion;
+import com.gemstone.gemfire.internal.cache.PartitionedRegion;
+import com.gemstone.gemfire.internal.cache.RegionQueue;
+import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+import 
com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import 
com.gemstone.gemfire.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import 
com.gemstone.gemfire.internal.cache.wan.parallel.ParallelGatewaySenderQueue;
+
+import dunit.AsyncInvocation;
+import dunit.DistributedTestCase;
+
+import java.util.Set;
+
+/**
+ * @author skumar
+ *
+ */
+public class CommonParallelGatewaySenderDUnitTest extends WANTestBase {
+
+  public CommonParallelGatewaySenderDUnitTest(String name ){
+    super(name);
+  }
+  
+  public void testSameSenderWithNonColocatedRegions() throws Exception {
+    addExpectedException("cannot have the same parallel");
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+    try {
+      vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+      fail("Expected IllegateStateException : cannot have the same parallel 
gateway sender");
+    }
+    catch (Exception e) {
+      if (!(e.getCause() instanceof IllegalStateException)
+          || !(e.getCause().getMessage()
+              .contains("cannot have the same parallel gateway sender id"))) {
+        fail("Expected IllegalStateException", e);
+      }
+    }
+  }
+  
+  /**
+   * Simple scenario. Two regions attach the same PGS
+   * @throws Exception
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed 
to old style
+    2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  public void DISABLED_testParallelPropagation() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", null, 1, 100, isOffHeap() });
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", null, 1, 100, isOffHeap() });
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR1",
+        1000 });
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR2",
+      1000 });
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(CommonParallelGatewaySenderDUnitTest.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(CommonParallelGatewaySenderDUnitTest.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm6.invoke(CommonParallelGatewaySenderDUnitTest.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm7.invoke(CommonParallelGatewaySenderDUnitTest.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR1", 1000 });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR2", 1000 });
+  }
+  
+  /**
+   * The PGS is persistence enabled but not the Regions
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed 
to old style
+    2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  public void DISABLED_testParallelPropagationPersistenceEnabled() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, true, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, true, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, true, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, true, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR1", null, 1, 100, isOffHeap() });
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR2", null, 1, 100, isOffHeap() });
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR1",
+        1000 });
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR2",
+      1000 });
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(CommonParallelGatewaySenderDUnitTest.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(CommonParallelGatewaySenderDUnitTest.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm6.invoke(CommonParallelGatewaySenderDUnitTest.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm7.invoke(CommonParallelGatewaySenderDUnitTest.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR1", 1000 });
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR2", 1000 });
+  }
+  
+  
+  /**
+   * Enable persistence for GatewaySender.
+   * Pause the sender and do some puts in local region.  
+   * Close the local site and rebuild the region and sender from disk store.
+   * Dispatcher should not start dispatching events recovered from persistent 
sender.
+   * Check if the remote site receives all the events.
+   * Below test is disabled intentionally
+    1> In this release 8.0, for rolling upgrade support queue name is changed 
to old style
+    2>Comman parallel sender for different non colocated regions is not 
supported in 8.0 so no need to bother about 
+      ParallelGatewaySenderQueue#convertPathToName
+    3> We have to enabled it in next release
+    4> Version based rolling upgrade support should be provided. based on the 
version of the gemfire QSTRING should be used between 8.0 
+     and version prior to 8.0
+   */
+  public void DISABLED_testPRWithGatewaySenderPersistenceEnabled_Restart() {
+    //create locator on local site
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    //create locator on remote site
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    //create receiver on remote site
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    //create cache in local site
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    //create senders with disk store
+    String diskStore1 = (String) vm4.invoke(WANTestBase.class, 
"createSenderWithDiskStore", 
+        new Object[] { "ln", 2, true, 100, 10, false, true, null, null, true 
});
+    String diskStore2 = (String) vm5.invoke(WANTestBase.class, 
"createSenderWithDiskStore", 
+        new Object[] { "ln", 2, true, 100, 10, false, true, null, null, true 
});
+    String diskStore3 = (String) vm6.invoke(WANTestBase.class, 
"createSenderWithDiskStore", 
+        new Object[] { "ln", 2, true, 100, 10, false, true, null, null, true 
});
+    String diskStore4 = (String) vm7.invoke(WANTestBase.class, 
"createSenderWithDiskStore", 
+        new Object[] { "ln", 2, true, 100, 10, false, true, null, null, true 
});
+
+    getLogWriter().info("The DS are: " + diskStore1 + "," + diskStore2 + "," + 
diskStore3 + "," + diskStore4);
+    
+    //create PR on remote site
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR1", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR1", null, 1, 100, isOffHeap() });
+    
+    //create PR on remote site
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR2", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR2", null, 1, 100, isOffHeap() });
+    
+    //create PR on local site
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR1", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR1", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR1", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR1", "ln", 1, 100, isOffHeap() });
+
+    //create PR on local site
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR2", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR2", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR2", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName+"PR2", "ln", 1, 100, isOffHeap() });
+
+    
+    //start the senders on local site
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    //wait for senders to become running
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    //pause the senders
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    
+    //start puts in region on local site
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName+"PR1", 
3000 });
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName+"PR2", 
5000 });
+    getLogWriter().info("Completed puts in the region");
+    
+    //--------------------close and rebuild local site 
-------------------------------------------------
+    //kill the senders
+    vm4.invoke(WANTestBase.class, "killSender", new Object[] {});
+    vm5.invoke(WANTestBase.class, "killSender", new Object[] {});
+    vm6.invoke(WANTestBase.class, "killSender", new Object[] {});
+    vm7.invoke(WANTestBase.class, "killSender", new Object[] {});
+    
+    getLogWriter().info("Killed all the senders.");
+    
+    //restart the vm
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    
+    getLogWriter().info("Created back the cache");
+    
+   //create senders with disk store
+    vm4.invoke(WANTestBase.class, "createSenderWithDiskStore", 
+        new Object[] { "ln", 2, true, 100, 10, false, true, null, diskStore1, 
true });
+    vm5.invoke(WANTestBase.class, "createSenderWithDiskStore", 
+        new Object[] { "ln", 2, true, 100, 10, false, true, null, diskStore2, 
true });
+    vm6.invoke(WANTestBase.class, "createSenderWithDiskStore", 
+        new Object[] { "ln", 2, true, 100, 10, false, true, null, diskStore3, 
true });
+    vm7.invoke(WANTestBase.class, "createSenderWithDiskStore", 
+        new Object[] { "ln", 2, true, 100, 10, false, true, null, diskStore4, 
true });
+    
+    getLogWriter().info("Created the senders back from the disk store.");
+    //create PR on local site
+    AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, 
"createPartitionedRegion", new Object[] {
+      testName+"PR1", "ln", 1, 100, isOffHeap() });
+    AsyncInvocation inv2 = vm5.invokeAsync(WANTestBase.class, 
"createPartitionedRegion", new Object[] {
+      testName+"PR1", "ln", 1, 100, isOffHeap() });
+    AsyncInvocation inv3 = vm6.invokeAsync(WANTestBase.class, 
"createPartitionedRegion", new Object[] {
+      testName+"PR1", "ln", 1, 100, isOffHeap() });
+    AsyncInvocation inv4 = vm7.invokeAsync(WANTestBase.class, 
"createPartitionedRegion", new Object[] {
+      testName+"PR1", "ln", 1, 100, isOffHeap() });
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+
+    inv1 = vm4.invokeAsync(WANTestBase.class, "createPartitionedRegion", new 
Object[] {
+      testName+"PR2", "ln", 1, 100, isOffHeap() });
+    inv2 = vm5.invokeAsync(WANTestBase.class, "createPartitionedRegion", new 
Object[] {
+      testName+"PR2", "ln", 1, 100, isOffHeap() });
+    inv3 = vm6.invokeAsync(WANTestBase.class, "createPartitionedRegion", new 
Object[] {
+      testName+"PR2", "ln", 1, 100, isOffHeap() });
+    inv4 = vm7.invokeAsync(WANTestBase.class, "createPartitionedRegion", new 
Object[] {
+      testName+"PR2", "ln", 1, 100, isOffHeap() });
+    
+    try {
+      inv1.join();
+      inv2.join();
+      inv3.join();
+      inv4.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }
+    
+    getLogWriter().info("Created back the partitioned regions");
+    
+    //start the senders in async mode. This will ensure that the 
+    //node of shadow PR that went down last will come up first
+    vm4.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    getLogWriter().info("Waiting for senders running.");
+    //wait for senders running
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    getLogWriter().info("All the senders are now running...");
+    
+    
//----------------------------------------------------------------------------------------------------
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName+"PR1", 3000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName+"PR1", 3000 });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName+"PR2", 5000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName+"PR2", 5000 });
+  }
+  
+  public static void validateParallelSenderQueueAllBucketsDrained(final String 
senderId) {
+    Set<GatewaySender> senders = cache.getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+    ConcurrentParallelGatewaySenderQueue regionQueue = 
(ConcurrentParallelGatewaySenderQueue)((AbstractGatewaySender)sender).getQueues().toArray(new
 RegionQueue[1])[0];
+    
+    Set<PartitionedRegion> shadowPRs = 
(Set<PartitionedRegion>)regionQueue.getRegions();
+    
+    for(PartitionedRegion shadowPR: shadowPRs) {
+      Set<BucketRegion> buckets = 
shadowPR.getDataStore().getAllLocalBucketRegions();
+      
+      for (final BucketRegion bucket : buckets) {
+        WaitCriterion wc = new WaitCriterion() {
+          public boolean done() {
+            if (bucket.keySet().size() == 0) {
+              getLogWriter().info("Bucket " + bucket.getId() + " is empty");
+              return true;
+            }
+            return false;
+          }
+     
+          public String description() {
+            return "Expected bucket entries for bucket: " + bucket.getId() + " 
is: 0 but actual entries: " 
+              + bucket.keySet().size() + " This bucket isPrimary: " + 
bucket.getBucketAdvisor().isPrimary() + " KEYSET: " + bucket.keySet();
+          }
+        };
+        DistributedTestCase.waitForCriterion(wc, 180000, 50, true);
+      
+      }//for loop ends
+    }
+    
+
+  }
+  
+}


Reply via email to