http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
new file mode 100644
index 0000000..228cdad
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationClientServerDUnitTest.java
@@ -0,0 +1,109 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+
+/**
+ * @author skumar
+ * 
+ */
+public class ParallelWANPropagationClientServerDUnitTest extends WANTestBase {
+  private static final long serialVersionUID = 1L;
+
+  public ParallelWANPropagationClientServerDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  /**
+   * Normal happy scenario test case.
+   * 
+   * @throws Exception
+   */
+  public void testParallelPropagationWithClientServer() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiverAndServer", new Object[] { 
nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiverAndServer", new Object[] { 
nyPort });
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 100, isOffHeap() });
+
+    vm4.invoke(WANTestBase.class, "createClientWithLocator", new Object[] {
+        nyPort, "localhost", testName + "_PR" });
+    vm4.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+      100 });
+
+    vm5.invoke(WANTestBase.class, "createServer", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createServer", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+         100, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+         100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 100, isOffHeap() });
+
+    vm7.invoke(WANTestBase.class, "createClientWithLocator", new Object[] {
+      lnPort, "localhost", testName + "_PR" });
+    
+    AsyncInvocation inv1 = vm5.invokeAsync(WANTestBase.class, "startSender", 
new Object[] { "ln" });
+    AsyncInvocation inv2 = vm6.invokeAsync(WANTestBase.class, "startSender", 
new Object[] { "ln" });
+
+    inv1.join();
+    inv2.join();
+    // before doing any puts, let the senders be running in order to ensure 
that
+    // not a single event will be lost
+    
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState",
+        new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "waitForSenderRunningState",
+        new Object[] { "ln" });
+    
+    vm7.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+        10000 });
+
+    
+    // verify all buckets drained on all sender nodes.
+    vm5.invoke(WANTestBase.class,
+        "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class,
+        "validateParallelSenderQueueAllBucketsDrained", new Object[] { "ln" });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 10000 });
+    vm3.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 10000 });
+
+    vm5.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 10000 });
+    vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 10000 });
+
+    vm7.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName + "_PR", 10000 });
+    
+    vm4.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName + "_PR", 10000 });
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
new file mode 100644
index 0000000..8eae664
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/parallel/ParallelWANPropagationConcurrentOpsDUnitTest.java
@@ -0,0 +1,282 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.parallel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+
+public class ParallelWANPropagationConcurrentOpsDUnitTest extends WANTestBase {
+
+  private static final long serialVersionUID = 1L;
+  
+  public ParallelWANPropagationConcurrentOpsDUnitTest(String name) {
+    super(name);
+  }
+  
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+  
+  /**
+   * Normal propagation scenario test case for a PR with only one bucket.
+   * This has been added for bug# 44284.
+   * @throws Exception
+   */
+  public void testParallelPropagationWithSingleBucketPR() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ 
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+ 
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 1, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 1, isOffHeap() });
+ 
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ 
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 1, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 1, isOffHeap() });
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    //pause the senders
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln" });
+    
+    pause(5000);
+    
+    AsyncInvocation async1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 700 });
+    AsyncInvocation async2 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 1000 });
+    AsyncInvocation async3 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 800 });
+    AsyncInvocation async4 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 1000 });
+    
+    async1.join();
+    async2.join();
+    async3.join();
+    async4.join();
+    
+    int queueSize = (Integer) vm4.invoke(WANTestBase.class, 
"getQueueContentSize", new Object[] { "ln" });
+    assertEquals("Actual queue size is not matching with the expected", 3500, 
queueSize);
+    
+    //resume the senders now
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln" });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+
+  /**
+   * Normal propagation scenario test case for a PR with less number of 
buckets.
+   * Buckets have been kept to 10 for this test.
+   * This has been added for bug# 44287.
+   * @throws Exception
+   */
+  public void testParallelPropagationWithLowNumberofBuckets() throws Exception 
{
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+ 
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+ 
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 10, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 10, isOffHeap() });
+ 
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+ 
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 10, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 10, isOffHeap() });
+
+    //before doing any puts, let the senders be running in order to ensure that
+    //not a single event will be lost
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln" });
+    
+    AsyncInvocation async1 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 700 });
+    AsyncInvocation async2 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 1000 });
+    AsyncInvocation async3 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 800 });
+    AsyncInvocation async4 = vm4.invokeAsync(WANTestBase.class, "doPuts", new 
Object[] { testName + "_PR", 1000 });
+    
+    async1.join();
+    async2.join();
+    async3.join();
+    async4.join();
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+        testName + "_PR", 1000 });
+  }
+
+  public void testParalleQueueDrainInOrder_PR() throws Exception {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", null, 3, 4, isOffHeap() });
+  
+    vm2.invoke(WANTestBase.class, "addListenerOnRegion", new Object[] 
{testName + "_PR"});
+  
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm5.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm6.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm7.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm6.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 10, false, false, null, true });
+    vm7.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 10, false, false, null, true });
+
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 3, 4, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 3, 4, isOffHeap() });
+    vm6.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 3, 4, isOffHeap() });
+    vm7.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 3, 4, isOffHeap() });
+    
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm6.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm7.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm4.invoke(WANTestBase.class, "addQueueListener", new Object[] { "ln", 
true});
+    vm5.invoke(WANTestBase.class, "addQueueListener", new Object[] { "ln", 
true});
+    vm6.invoke(WANTestBase.class, "addQueueListener", new Object[] { "ln", 
true});
+    vm7.invoke(WANTestBase.class, "addQueueListener", new Object[] { "ln", 
true});
+
+    pause(2000);
+    vm4.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln"});
+    vm5.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln"});
+    vm6.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln"});
+    vm7.invoke(WANTestBase.class, "pauseSender", new Object[] { "ln"});
+    
+    pause(2000);
+    
+    vm6.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+      4 });
+    vm4.invoke(WANTestBase.class, "addListenerOnBucketRegion", new Object[] 
{testName + "_PR", 4});
+    vm5.invoke(WANTestBase.class, "addListenerOnBucketRegion", new Object[] 
{testName + "_PR", 4});
+    vm6.invoke(WANTestBase.class, "addListenerOnBucketRegion", new Object[] 
{testName + "_PR", 4 });
+    vm7.invoke(WANTestBase.class, "addListenerOnBucketRegion", new Object[] 
{testName + "_PR", 4});
+    
+    vm4.invoke(WANTestBase.class, "addListenerOnQueueBucketRegion", new 
Object[] { "ln" , 4});
+    vm5.invoke(WANTestBase.class, "addListenerOnQueueBucketRegion", new 
Object[] { "ln" , 4});
+    vm6.invoke(WANTestBase.class, "addListenerOnQueueBucketRegion", new 
Object[] { "ln" , 4 });
+    vm7.invoke(WANTestBase.class, "addListenerOnQueueBucketRegion", new 
Object[] { "ln" , 4});
+    
+    vm6.invoke(WANTestBase.class, "doPuts", new Object[] { testName + "_PR",
+      1000 });
+    
+    vm6.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName + "_PR", 1000 });
+    
+    HashMap vm4BRUpdates = (HashMap)vm4.invoke(WANTestBase.class, "checkBR", 
new Object[] {testName + "_PR", 4});
+    HashMap vm5BRUpdates = (HashMap)vm5.invoke(WANTestBase.class, "checkBR", 
new Object[] {testName + "_PR", 4});
+    HashMap vm6BRUpdates = (HashMap)vm6.invoke(WANTestBase.class, "checkBR", 
new Object[] {testName + "_PR", 4});
+    HashMap vm7BRUpdates = (HashMap)vm7.invoke(WANTestBase.class, "checkBR", 
new Object[] {testName + "_PR", 4});
+    
+    List b0SenderUpdates = (List)vm4BRUpdates.get("Create0");
+    List b1SenderUpdates = (List)vm4BRUpdates.get("Create1");
+    List b2SenderUpdates = (List)vm4BRUpdates.get("Create2");
+    List b3SenderUpdates = (List)vm4BRUpdates.get("Create3");
+    
+    HashMap vm4QueueBRUpdates = (HashMap)vm4.invoke(WANTestBase.class, 
"checkQueue_BR", new Object[] {"ln", 4});
+    HashMap vm5QueueBRUpdates = (HashMap)vm5.invoke(WANTestBase.class, 
"checkQueue_BR", new Object[] {"ln", 4});
+    HashMap vm6QueueBRUpdates = (HashMap)vm6.invoke(WANTestBase.class, 
"checkQueue_BR", new Object[] {"ln", 4});
+    HashMap vm7QueueBRUpdates = (HashMap)vm7.invoke(WANTestBase.class, 
"checkQueue_BR", new Object[] {"ln", 4});
+    
+    assertEquals(vm4QueueBRUpdates, vm5QueueBRUpdates);
+    assertEquals(vm4QueueBRUpdates, vm6QueueBRUpdates);
+    assertEquals(vm4QueueBRUpdates, vm7QueueBRUpdates);
+    
+    vm4.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln"});
+    vm5.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln"});
+    vm6.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln"});
+    vm7.invoke(WANTestBase.class, "resumeSender", new Object[] { "ln"});
+    vm2.invoke(WANTestBase.class, "validateRegionSize", new Object[] {
+      testName + "_PR", 1000 });
+    HashMap receiverUpdates = (HashMap)vm2.invoke(WANTestBase.class, 
"checkPR", new Object[] {
+      testName + "_PR"});
+    List<Long> createList = (List)receiverUpdates.get("Create");
+    ArrayList<Long> b0ReceiverUpdates = new ArrayList<Long>();
+    ArrayList<Long> b1ReceiverUpdates = new ArrayList<Long>();
+    ArrayList<Long> b2ReceiverUpdates = new ArrayList<Long>();
+    ArrayList<Long> b3ReceiverUpdates = new ArrayList<Long>();
+    for (Long key : createList) {
+      long mod = key % 4;
+      if (mod == 0) {
+        b0ReceiverUpdates.add(key);
+      }
+      else if (mod == 1) {
+        b1ReceiverUpdates.add(key);
+      }
+      else if (mod == 2) {
+        b2ReceiverUpdates.add(key);
+      }
+      else if (mod == 3) {
+        b3ReceiverUpdates.add(key);
+      }
+    }
+    b0ReceiverUpdates.remove(0);
+    b1ReceiverUpdates.remove(0);
+    b2ReceiverUpdates.remove(0);
+    b3ReceiverUpdates.remove(0);
+    
+    assertEquals(b0SenderUpdates, b0ReceiverUpdates);
+    assertEquals(b1SenderUpdates, b1ReceiverUpdates);
+    assertEquals(b2SenderUpdates, b2ReceiverUpdates);
+    assertEquals(b3SenderUpdates, b3ReceiverUpdates);
+  }
+
+}

Reply via email to