http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
new file mode 100644
index 0000000..0ee0b36
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/concurrent/ConcurrentParallelGatewaySenderOperation_1_DUnitTest.java
@@ -0,0 +1,846 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.concurrent;
+
+import com.gemstone.gemfire.cache.wan.GatewaySender.OrderPolicy;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+
+/**
+ * @author skumar
+ *
+ */
+public class ConcurrentParallelGatewaySenderOperation_1_DUnitTest extends 
WANTestBase {
+  private static final long serialVersionUID = 1L;
+  
+  public ConcurrentParallelGatewaySenderOperation_1_DUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+    addExpectedException("Broken pipe");
+    addExpectedException("Connection reset");
+    addExpectedException("Unexpected IOException");
+  }
+  
+  public void testParallelGatewaySenderWithoutStarting() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 6, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
1000 });
+    
+    vm4.invoke(WANTestBase.class, "verifySenderStoppedState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "verifySenderStoppedState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "verifySenderStoppedState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "verifySenderStoppedState", new Object[] { 
"ln" });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName 
+ "_PR", 0 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName 
+ "_PR", 0 });
+  }
+  
+  /**
+   * Defect 44323 (ParallelGatewaySender should not be started on Accessor 
node)
+   */
+  public void testParallelGatewaySenderStartOnAccessorNode() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegionAsAccessor", new 
Object[] {
+        testName + "_PR", "ln", 1, 100 });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegionAsAccessor", new 
Object[] {
+        testName + "_PR", "ln", 1, 100 });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    
+    //start the senders
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    pause(2000);
+    
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
1000 });
+
+    vm4.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName 
+ "_PR", 1000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName 
+ "_PR", 1000 });
+  }
+
+  
+  /**
+   * Normal scenario in which the sender is paused in between.
+   * @throws Exception
+   */
+  public void testParallelPropagationSenderPause() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    //FIRST RUN: now, the senders are started. So, start the puts
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
100 });
+    
+    //now, pause all of the senders
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    pause(2000);
+    //SECOND RUN: keep one thread doing puts to the region
+    vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + 
"_PR", 1000 });
+    
+    //verify region size remains on remote vm and is restricted below a 
specified limit (i.e. number of puts in the first run)
+    vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new 
Object[] {testName + "_PR", 100 });
+  }
+
+  /**
+   * Normal scenario in which a paused sender is resumed.
+   * @throws Exception
+   */
+  public void testParallelPropagationSenderResume() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 8, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    //now, the senders are started. So, start the puts
+    vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + 
"_PR", 1000 });
+    
+    //now, pause all of the senders
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    
+    //sleep for a second or two
+    pause(2000);
+    
+    //resume the senders
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    pause(2000);
+    
+    vm4.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm6.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm7.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    
+    //find the region size on remote vm
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName 
+ "_PR", 1000 });
+ 
+  }
+  
+  /**
+   * Negative scenario in which a sender that is stopped (and not paused) is 
resumed.
+   * Expected: resume is only valid for pause. If a sender which is stopped is 
resumed,
+   * it will not be started again.
+   * 
+   * @throws Exception
+   */
+  public void testParallelPropagationSenderResumeNegativeScenario() throws 
Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", null, 1, 100, isOffHeap() });
+  
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    //wait till the senders are running
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+
+    //start the puts
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
100 });
+
+    //let the queue drain completely
+    vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln", 0 });
+    
+    //stop the senders
+    vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    
+    //now, try to resume a stopped sender
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    //do more puts
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
1000 });
+    
+    //validate region size on remote vm to contain only the events put in 
local site 
+    //before the senders are stopped.
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName 
+ "_PR", 100 });
+  }
+
+  /**
+   * Normal scenario in which a sender is stopped.
+   * @throws Exception
+   */
+  public void testParallelPropagationSenderStop() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 3, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    //FIRST RUN: now, the senders are started. So, do some of the puts
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
100 });
+    
+    //now, stop all of the senders
+    vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    
+    //SECOND RUN: keep one thread doing puts
+    vm4.invokeAsync(WANTestBase.class, "doPuts", new Object[] { testName + 
"_PR", 1000 });
+    
+    //verify region size remains on remote vm and is restricted below a 
specified limit (number of puts in the first run)
+    vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new 
Object[] {testName + "_PR", 100 });
+  }
+
+  /**
+   * Normal scenario in which a sender is stopped and then started again.
+   */
+  public void testParallelPropagationSenderStartAfterStop() throws Throwable {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    //FIRST RUN: now, the senders are started. So, do some of the puts
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
200 });
+    
+    //now, stop all of the senders
+    vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { 
"ln", 0 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { 
"ln", 0 });
+    vm6.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { 
"ln", 0 });
+    vm7.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { 
"ln", 0 });
+    
+    //SECOND RUN: do some of the puts after the senders are stopped
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
1000 });
+    
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new 
Object[] {testName + "_PR", 200 });
+    
+    //start the senders again
+    AsyncInvocation vm4start = vm4.invokeAsync(WANTestBase.class, 
"startSender", new Object[] { "ln" });
+    AsyncInvocation vm5start = vm5.invokeAsync(WANTestBase.class, 
"startSender", new Object[] { "ln" });
+    AsyncInvocation vm6start = vm6.invokeAsync(WANTestBase.class, 
"startSender", new Object[] { "ln" });
+    AsyncInvocation vm7start = vm7.invokeAsync(WANTestBase.class, 
"startSender", new Object[] { "ln" });
+    int START_TIMEOUT = 30000;
+    vm4start.getResult(START_TIMEOUT);
+    vm5start.getResult(START_TIMEOUT);
+    vm6start.getResult(START_TIMEOUT);
+    vm7start.getResult(START_TIMEOUT);
+
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new 
Object[] {testName + "_PR", 200 });
+
+    //SECOND RUN: do some more puts
+    AsyncInvocation async = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 1000 });
+    async.join();
+    
+    //verify all the buckets on all the sender nodes are drained
+    vm4.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm6.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm7.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    
+    //verify the events propagate to remote site
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName 
+ "_PR", 1000 });
+    
+    vm4.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { 
"ln", 0 });
+    vm5.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { 
"ln", 0 });
+    vm6.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { 
"ln", 0 });
+    vm7.invoke(WANTestBase.class, "validateQueueSizeStat", new Object[] { 
"ln", 0 });
+  }
+
+  /**
+   * Normal scenario in which a sender is stopped and then started again.
+   * Differs from above test case in the way that when the sender is starting 
from
+   * stopped state, puts are simultaneously happening on the region by another 
thread.
+   * @throws Exception
+   */
+  public void Bug47553_testParallelPropagationSenderStartAfterStop_Scenario2() 
throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 7, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    //make sure all the senders are running before doing any puts
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    getLogWriter().info("All the senders are now started");
+    
+    //FIRST RUN: now, the senders are started. So, do some of the puts
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
200 });
+    
+    getLogWriter().info("Done few puts");
+    
+    //now, stop all of the senders
+    vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    
+    getLogWriter().info("All the senders are stopped");
+    pause(2000);
+    
+    //SECOND RUN: do some of the puts after the senders are stopped
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
1000 });
+    getLogWriter().info("Done some more puts in second run");
+    
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new 
Object[] {testName + "_PR", 200 });
+    
+    //SECOND RUN: start async puts on region
+    AsyncInvocation async = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 5000 });
+    getLogWriter().info("Started high number of puts by async thread");
+
+    getLogWriter().info("Starting the senders at the same time");
+    //when puts are happening by another thread, start the senders
+    vm4.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    getLogWriter().info("All the senders are started");
+    
+    async.join();
+        
+    pause(2000);
+    
+    //verify all the buckets on all the sender nodes are drained
+    vm4.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm6.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm7.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    
+    //verify that the queue size ultimately becomes zero. That means all the 
events propagate to remote site.
+    vm4.invoke(WANTestBase.class, "validateQueueContents", new Object[] { 
"ln", 0 });
+  }
+  
+  /**
+   * Normal scenario in which a sender is stopped and then started again on 
accessor node.
+   * @throws Exception
+   */
+  public void testParallelPropagationSenderStartAfterStopOnAccessorNode() 
throws Throwable {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 4, OrderPolicy.KEY });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegionAsAccessor", new 
Object[] {
+        testName + "_PR", "ln", 1, 100});
+    vm7.invoke(WANTestBase.class, "createPartitionedRegionAsAccessor", new 
Object[] {
+        testName + "_PR", "ln", 1, 100});
+
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    //make sure all the senders are not running on accessor nodes and running 
on non-accessor nodes
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    //FIRST RUN: now, the senders are started. So, do some of the puts
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
200 });
+    
+    //now, stop all of the senders
+    vm4.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "stopSender", new Object[] { "ln" });
+    
+    pause(2000);
+    
+    //SECOND RUN: do some of the puts after the senders are stopped
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
1000 });
+    
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new 
Object[] {testName + "_PR", 200 });
+    
+    //start the senders again
+    AsyncInvocation vm4start = vm4.invokeAsync(WANTestBase.class, 
"startSender", new Object[] { "ln" });
+    AsyncInvocation vm5start = vm5.invokeAsync(WANTestBase.class, 
"startSender", new Object[] { "ln" });
+    AsyncInvocation vm6start = vm6.invokeAsync(WANTestBase.class, 
"startSender", new Object[] { "ln" });
+    AsyncInvocation vm7start = vm7.invokeAsync(WANTestBase.class, 
"startSender", new Object[] { "ln" });
+    int START_TIMEOUT = 30000;
+    vm4start.getResult(START_TIMEOUT);
+    vm5start.getResult(START_TIMEOUT);
+    vm6start.getResult(START_TIMEOUT);
+    vm7start.getResult(START_TIMEOUT);
+
+    //Region size on remote site should remain same and below the number of 
puts done in the FIRST RUN
+    vm2.invoke(WANTestBase.class, "validateRegionSizeRemainsSame", new 
Object[] {testName + "_PR", 200 });
+
+    //SECOND RUN: do some more puts
+    AsyncInvocation async = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 1000 });
+    async.join();
+    pause(5000);
+    
+    //verify all buckets drained only on non-accessor nodes.
+    vm4.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    
+    //verify the events propagate to remote site
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {testName 
+ "_PR", 1000 });
+  }
+
+  
+  /**
+   * Normal scenario in which a combinations of start, pause, resume operations
+   * is tested
+   */
+  public void testStartPauseResumeParallelGatewaySender() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    getLogWriter().info("Created cache on local site");
+    
+    vm4.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+    vm5.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+    vm6.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+    vm7.invoke(WANTestBase.class, "createConcurrentSender", new Object[] { 
"ln", 2,
+        true, 100, 10, false, false, null, true, 5, OrderPolicy.KEY });
+    
+    getLogWriter().info("Created senders on local site");
+    
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    getLogWriter().info("Created PRs on local site");
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    getLogWriter().info("Created PRs on remote site");
+    
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
1000 });
+    getLogWriter().info("Done 1000 puts on local site");
+    
+    //Since puts are already done on userPR, it will have the buckets created. 
+    //During sender start, it will wait until those buckets are created for 
shadowPR as well.
+    //Start the senders in async threads, so colocation of shadowPR will be 
complete and 
+    //missing buckets will be created in 
PRHARedundancyProvider.createMissingBuckets().
+    vm4.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    getLogWriter().info("Started senders on local site");
+    
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR", 
5000 });
+    getLogWriter().info("Done 5000 puts on local site");
+    
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    getLogWriter().info("Paused senders on local site");
+    
+    vm4.invoke(WANTestBase.class, "verifySenderPausedState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "verifySenderPausedState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "verifySenderPausedState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "verifySenderPausedState", new Object[] { 
"ln" });
+    
+    AsyncInvocation inv1 = vm4.invokeAsync(WANTestBase.class, "doPuts",
+        new Object[] { testName + "_PR", 1000 });
+    getLogWriter().info("Started 1000 async puts on local site");
+
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    getLogWriter().info("Resumed senders on local site");
+
+    vm4.invoke(WANTestBase.class, "verifySenderResumedState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "verifySenderResumedState", new Object[] { 
"ln" });
+    vm6.invoke(WANTestBase.class, "verifySenderResumedState", new Object[] { 
"ln" });
+    vm7.invoke(WANTestBase.class, "verifySenderResumedState", new Object[] { 
"ln" });
+
+    try {
+      inv1.join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail("Interrupted the async invocation.");
+    }
+    
+    //verify all buckets drained on all sender nodes.
+    vm4.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm5.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm6.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+    vm7.invoke(WANTestBase.class, 
"validateParallelSenderQueueAllBucketsDrained", new Object[] {"ln"});
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName + "_PR", 5000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName + "_PR", 5000 });
+  }
+}


Reply via email to