http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
new file mode 100644
index 0000000..fe21691
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java
@@ -0,0 +1,16 @@
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+@SuppressWarnings("serial")
+public class CommonParallelGatewaySenderOffHeapDUnitTest extends
+    CommonParallelGatewaySenderDUnitTest {
+
+  public CommonParallelGatewaySenderOffHeapDUnitTest(String name) {
+    super(name);
+  }
+
+  @Override
+  public boolean isOffHeap() {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
new file mode 100644
index 0000000..4cff68d
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java
@@ -0,0 +1,521 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+/**
+ * 
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.internal.cache.DistributedCacheOperation;
+import com.gemstone.gemfire.internal.cache.EntrySnapshot;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+import com.gemstone.gemfire.internal.cache.RegionEntry;
+import com.gemstone.gemfire.internal.cache.Token.Tombstone;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+
+/**
+ * @author shobhit
+ *
+ * Test verifies that version tag for destroyed entry is propagated back to
+ * origin distributed system if the version tag is applied and replaces old
+ * version information in destination distributed system.
+ *
+ * Version tag information which is relevant between multiple distributed
+ * systems consistency check is basically dsid and timestamp.
+ */
+public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase {
+
+  //These fields are used as BlackBoard for test data verification.
+  static long destroyTimeStamp;
+  static int destroyingMember;
+  
+  public NewWANConcurrencyCheckForDestroyDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  public void testVersionTagTimestampForDestroy() {
+    
+    
+    // create three distributed systems with each having a cache containing
+    // a Replicated Region with one entry and concurrency checks enabled.
+
+    // Site 2 and Site 3 only know about Site 1 but Site 1 knows about both
+    // Site 2 and Site 3.
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer lnRecPort = (Integer) vm1.invoke(WANTestBase.class, 
"createReceiver", new Object[] { lnPort });
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer nyRecPort = (Integer) vm3.invoke(WANTestBase.class, 
"createReceiver", new Object[] { nyPort });
+
+    //Site 3
+    Integer tkPort = (Integer)vm4.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 3, lnPort });
+    Integer tkRecPort = (Integer) vm5.invoke(WANTestBase.class, 
"createReceiver", new Object[] { tkPort });
+
+    getLogWriter().info("Created locators and receivers in 3 distributed 
systems");
+     
+    //Site 1
+    vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+      true, 10, 1, false, false, null, true });
+    vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln2", 3,
+      true, 10, 1, false, false, null, true });
+    
+    vm1.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] 
{"repRegion", "ln1,ln2", 0, 1, isOffHeap() });
+    vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
+    vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" });
+    vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln1" });
+    vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln2" });
+    
+    //Site 2
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ny1", 1,
+      true, 10, 1, false, false, null, true });
+    
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] 
{"repRegion", "ny1", 0, 1, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ny1" });
+    vm3.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ny1" });
+    
+    //Site 3 which only knows about Site 1.
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "tk1", 1,
+      true, 10, 1, false, false, null, true });
+    
+    vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] 
{"repRegion", "tk1", 0, 1, isOffHeap() });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "tk1" });
+    vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"tk1" });
+    
+    pause(2000);
+    
+    // Perform a put in vm1
+    vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        
+        Region region = cache.getRegion("/repRegion");
+        region.put("testKey", "testValue");
+        
+        assertEquals(1, region.size());
+      }
+    });
+
+    //wait for vm1 to propagate put to vm3 and vm5
+    pause(2000); 
+
+    destroyTimeStamp = (Long) 
vm3.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, 
"getVersionTimestampAfterOp");
+    
+    //wait for vm1 to propagate destroyed entry's new version tag to vm5
+    pause(2000); 
+
+    vm5.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, 
"verifyTimestampAfterOp", 
+          new Object[] {destroyTimeStamp, 1 /* ds 3 receives gatway event only 
from ds 1*/});
+  }
+
+  /**
+   * Test creates two sites and one Replicated Region on each with Serial
+   * GatewaySender on each. Test checks for sequence of events being sent from
+   * site1 to site2 for PUTALL and PUT and finally checks for final timestamp 
in
+   * version for RegionEntry with key "testKey". If timestamp on both site is
+   * same that means events were transferred in correct sequence.
+   */
+  public void testPutAllEventSequenceOnSerialGatewaySenderWithRR() {
+    
+    // create two distributed systems with each having a cache containing
+    // a Replicated Region with one entry and concurrency checks enabled.
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer lnRecPort = (Integer) vm1.invoke(WANTestBase.class, 
"createReceiver", new Object[] { lnPort });
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer nyRecPort = (Integer) vm3.invoke(WANTestBase.class, 
"createReceiver", new Object[] { nyPort });
+
+    getLogWriter().info("Created locators and receivers in 2 distributed 
systems");
+     
+    //Site 1
+    vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+      false, 10, 1, false, false, null, true });
+    
+    vm1.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] 
{"repRegion", "ln1", isOffHeap() });
+    vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
+    vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln1" });
+    
+    //Site 2
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ny1", 1,
+      false, 10, 1, false, false, null, true });
+    
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] 
{"repRegion", "ny1", isOffHeap() });
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ny1" });
+    vm3.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ny1" });
+    
+    pause(2000);
+    
+    // Perform a put in vm1
+    AsyncInvocation asynch1 = vm1.invokeAsync(new 
CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        // Test hook to make put wait after RE lock is released but before 
Gateway events are sent.
+        DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; 
+        
+        Region region = cache.getRegion("/repRegion");
+        Map testMap = new HashMap();
+        testMap.put("testKey", "testValue1");
+        region.putAll(testMap);
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    //wait for vm1 to propagate put to vm3
+    pause(1000); 
+
+    AsyncInvocation asynch2 = vm1.invokeAsync(new 
CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        Region region = cache.getRegion("/repRegion");
+        
+        while (!region.containsKey("testKey")) {
+          pause(10);
+        }
+        // Test hook to make put wait after RE lock is released but before 
Gateway events are sent.
+        DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; 
+        
+        region.put("testKey", "testValue2");
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    try {
+      asynch1.join(5000);
+      asynch2.join(5000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") {
+        
+        @Override
+        public void run2() throws CacheException {
+          DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
+        }
+      });
+    }
+
+    //Wait for all Gateway events be received by vm3.
+    pause(1000);
+
+    long putAllTimeStampVm1 = (Long) 
vm1.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, 
"getVersionTimestampAfterPutAllOp");
+    
+    long putAllTimeStampVm3 = (Long) 
vm3.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, 
"getVersionTimestampAfterPutAllOp");
+    
+    assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+  }
+
+/**
+ * This is similar to above test but for PartitionedRegion.
+ */
+public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() {
+    
+    // create two distributed systems with each having a cache containing
+    // a Replicated Region with one entry and concurrency checks enabled.
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer lnRecPort = (Integer) vm1.invoke(WANTestBase.class, 
"createReceiver", new Object[] { lnPort });
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer nyRecPort = (Integer) vm3.invoke(WANTestBase.class, 
"createReceiver", new Object[] { nyPort });
+
+    getLogWriter().info("Created locators and receivers in 2 distributed 
systems");
+     
+    //Site 1
+    vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+      false, 10, 1, false, false, null, true });
+    
+    vm1.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] 
{"repRegion", "ln1", 0, 1, isOffHeap() });
+    vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
+    vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln1" });
+    
+    //Site 2
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ny1", 1,
+      false, 10, 1, false, false, null, true });
+    
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] 
{"repRegion", "ny1", 0, 1, isOffHeap() });
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ny1" });
+    vm3.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ny1" });
+    
+    pause(2000);
+    
+    // Perform a put in vm1
+    AsyncInvocation asynch1 = vm1.invokeAsync(new 
CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        // Test hook to make put wait after RE lock is released but before 
Gateway events are sent.
+        DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; 
+        
+        Region region = cache.getRegion("/repRegion");
+        Map testMap = new HashMap();
+        testMap.put("testKey", "testValue1");
+        region.putAll(testMap);
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    //wait for vm1 to propagate put to vm3
+    pause(1000); 
+
+    AsyncInvocation asynch2 = vm1.invokeAsync(new 
CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        Region region = cache.getRegion("/repRegion");
+        
+        while (!region.containsKey("testKey")) {
+          pause(10);
+        }
+        // Test hook to make put wait after RE lock is released but before 
Gateway events are sent.
+        DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; 
+        
+        region.put("testKey", "testValue2");
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    try {
+      asynch1.join(5000);
+      asynch2.join(5000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") {
+        
+        @Override
+        public void run2() throws CacheException {
+          DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0;
+        }
+      });
+    }
+
+    //Wait for all Gateway events be received by vm3.
+    pause(1000);
+
+    long putAllTimeStampVm1 = (Long) 
vm1.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, 
"getVersionTimestampAfterPutAllOp");
+    
+    long putAllTimeStampVm3 = (Long) 
vm3.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, 
"getVersionTimestampAfterPutAllOp");
+    
+    assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+  }
+
+  /**
+   * Tests if conflict checks are happening based on DSID and timestamp even if
+   * version tag is generated in local distributed system.
+   */
+  public void testConflicChecksBasedOnDsidAndTimeStamp() {
+
+    
+    // create two distributed systems with each having a cache containing
+    // a Replicated Region with one entry and concurrency checks enabled.
+
+    // Site 1
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer lnRecPort = (Integer) vm1.invoke(WANTestBase.class, 
"createReceiver", new Object[] { lnPort });
+    
+    //Site 2
+    Integer nyPort = (Integer)vm2.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    Integer nyRecPort = (Integer) vm3.invoke(WANTestBase.class, 
"createReceiver", new Object[] { nyPort });
+    
+    getLogWriter().info("Created locators and receivers in 2 distributed 
systems");
+
+    //Site 1
+    vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2,
+      false, 10, 1, false, false, null, true });
+    
+    vm1.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] 
{"repRegion", "ln1", isOffHeap() });
+    vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" });
+    vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ln1" });
+    
+    //Site 2
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] 
{"repRegion", "ny1", isOffHeap() });
+    
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { nyPort });
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ny1", 1,
+      false, 10, 1, false, false, null, true });
+    
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] 
{"repRegion", "ny1", isOffHeap() });
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ny1" });
+    vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { 
"ny1" });
+    
+    pause(2000);
+    
+    // Perform a put in vm1
+    vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        
+        Region region = cache.getRegion("/repRegion");
+        region.put("testKey", "testValue1");
+        
+        assertEquals(1, region.size());
+      }
+    });
+
+    //wait for vm4 to have later timestamp before sending operation to vm1
+    pause(300); 
+
+    AsyncInvocation asynch = vm4.invokeAsync(new 
CacheSerializableRunnable("Putting an entry in ds2 in vm4") {
+      
+      @Override
+      public void run2() throws CacheException {
+        assertNotNull(cache);
+        Region region = cache.getRegion("/repRegion");
+        
+        region.put("testKey", "testValue2");
+        
+        assertEquals(1, region.size());
+        assertEquals("testValue2", region.get("testKey"));
+      }
+    });
+
+    try {
+      asynch.join(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    //Wait for all local ds events be received by vm3.
+    pause(1000);
+
+    vm3.invoke(new CacheSerializableRunnable("Check dsid") {
+      
+      @Override
+      public void run2() throws CacheException {
+        Region region = cache.getRegion("repRegion");
+        
+        Region.Entry entry = ((LocalRegion)region).getEntry("testKey", 
/*null,*/
+            true); //commented while merging revision 43582
+        RegionEntry re = null;
+        if (entry instanceof EntrySnapshot) {
+          re = ((EntrySnapshot)entry).getRegionEntry();
+        } else if (entry instanceof NonTXEntry) {
+          re = ((NonTXEntry)entry).getRegionEntry();
+        }
+        VersionTag tag = re.getVersionStamp().asVersionTag();
+        assertEquals(2, tag.getDistributedSystemId());
+      }
+    });
+
+    // Check vm3 has latest timestamp from vm4.
+    long putAllTimeStampVm1 = (Long) 
vm4.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, 
"getVersionTimestampAfterPutAllOp");
+    
+    long putAllTimeStampVm3 = (Long) 
vm3.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, 
"getVersionTimestampAfterPutAllOp");
+    
+    assertEquals(putAllTimeStampVm1, putAllTimeStampVm3);
+  }
+  
+  /*
+   * For VM1 in ds 1. Used in testPutAllEventSequenceOnSerialGatewaySender.
+   */
+  public static long getVersionTimestampAfterPutAllOp() {
+    Region region = cache.getRegion("repRegion");
+    
+    while (!(region.containsKey("testKey") /*&& 
region.get("testKey").equals("testValue2") */)) {
+      pause(10);
+    }
+    assertEquals(1, region.size());
+
+    Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ 
true);
+    RegionEntry re = null;
+    if (entry instanceof EntrySnapshot) {
+      re = ((EntrySnapshot)entry).getRegionEntry();
+    } else if (entry instanceof NonTXEntry) {
+      re = ((NonTXEntry)entry).getRegionEntry();
+    }
+    if (re != null) {
+      getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + 
re.getValueInVM((LocalRegion) region));
+      
+      VersionTag tag = re.getVersionStamp().asVersionTag();
+      return tag.getVersionTimeStamp();
+    } else {
+      return -1;
+    }
+  }
+
+  /*
+   * For VM3 in ds 2.
+   */
+  public static long getVersionTimestampAfterOp() {
+    Region region = cache.getRegion("repRegion");
+    assertEquals(1, region.size());
+    
+    region.destroy("testKey");
+
+    Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ 
true);
+    RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
+    getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + 
re.getValueInVM((LocalRegion) region));
+    assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
+    
+    VersionTag tag = re.getVersionStamp().asVersionTag();
+    return tag.getVersionTimeStamp();
+  }
+
+  /*
+   * For VM 5 in ds 3.
+   */
+  public static void verifyTimestampAfterOp(long timestamp, int memberid) {
+    Region region = cache.getRegion("repRegion");
+    assertEquals(0, region.size());
+
+    Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ 
true);
+    RegionEntry re = ((EntrySnapshot)entry).getRegionEntry();
+    assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone);
+    
+    VersionTag tag = re.getVersionStamp().asVersionTag();
+    assertEquals(timestamp, tag.getVersionTimeStamp());
+    assertEquals(memberid, tag.getDistributedSystemId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java
 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java
new file mode 100644
index 0000000..cf78e24
--- /dev/null
+++ 
b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java
@@ -0,0 +1,779 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache.wan.misc;
+
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.internal.cache.wan.WANTestBase;
+
+import dunit.AsyncInvocation;
+
+public class PDXNewWanDUnitTest extends WANTestBase{
+
+  private static final long serialVersionUID = 1L;
+  
+  public PDXNewWanDUnitTest(String name) {
+    super(name);
+  }
+
+  public void setUp() throws Exception {
+    super.setUp();
+  }
+
+  /**
+   * Test
+   *   1> Site 1 : 1 locator, 1 member
+   *   2> Site 2 : 1 locator, 1 member
+   *   3> DR is defined on  member 1 on site1
+   *   4> Serial GatewaySender is defined on member 1 on site1
+   *   5> Same DR is defined on site2 member 1
+   *   6> Put is done with value which is PDXSerializable
+   *   7> Validate whether other sites member receive this put operation.    
+   */
+  public void testWANPDX_RR_SerialSender() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+        1 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_RR", 1 });
+  }
+  
+  /**
+   * Test
+   *   1> Site 1 : 1 locator, 1 member
+   *   2> Site 2 : 1 locator, 1 member
+   *   3> DR is defined on  member 1 on site1
+   *   4> Serial GatewaySender is defined on member 1 on site1
+   *   5> Same DR is defined on site2 member 1
+   *   6> Put is done with value which is PDXSerializable
+   *   7> Validate whether other sites member receive this put operation.
+   *   8> Bounce site 1 and delete all of it's data
+   *   9> Make sure that site 1 get the the PDX types along with entries
+   *   and can deserialize entries.     
+   */
+  public void testWANPDX_RemoveRomoteData() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort 
});
+
+    vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+        1 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_RR", 1 });
+    
+    
+    //bounce vm2
+    vm2.invoke(WANTestBase.class, "closeCache");
+    
+    vm2.invoke(WANTestBase.class, "deletePDXDir");
+    
+    vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort 
});
+    
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", null, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+      2 });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+      testName + "_RR", 2 });
+  }
+  
+  /**
+   * Test
+   *   1> Site 1 : 1 locator, 1 member
+   *   2> Site 2 : 1 locator, 1 member
+   *   3> DR is defined on  member 1 on site1
+   *   4> Serial GatewaySender is defined on member 1 on site1
+   *   5> Same DR is defined on site2 member 1
+   *   6> Put is done with value which is PDXSerializable
+   *   7> Validate whether other sites member receive this put operation.
+   *   8> Bounce site 1 and delete all of it's data
+   *   9> Make some conflicting PDX registries in site 1 before the reconnect
+   *   10> Make sure we flag a warning about the conflicting updates.     
+   */
+  public void testWANPDX_ConflictingData() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort 
});
+
+    vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+        1 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_RR", 1 });
+    
+    //bounce vm3
+    vm3.invoke(WANTestBase.class, "closeCache");
+    
+    ExpectedException ex1 = addExpectedException("Trying to add a PDXType with 
the same id");
+    ExpectedException ex2 = addExpectedException("CacheWriterException");
+    ExpectedException ex3 = addExpectedException("does match the existing PDX 
type");
+    ExpectedException ex4 = addExpectedException("ServerOperationException");
+    ExpectedException ex5 = addExpectedException("Stopping the processor");
+    
+    try {
+    //blow away vm3's PDX data
+    vm3.invoke(WANTestBase.class, "deletePDXDir");
+    
+    vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    //Define a different type from vm3
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable2", new Object[] { 
testName + "_RR",
+      2 });
+    
+    //Give the updates some time to make it over the WAN
+    pause(10000);
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+      testName + "_RR", 1 });
+    
+    vm3.invoke(WANTestBase.class, "closeCache");
+    } finally {
+      ex1.remove();
+      ex2.remove();
+      ex3.remove();
+      ex4.remove();
+      ex5.remove();
+    }
+  }
+  
+  /**
+   * Test
+   *   1> Site 1 : 1 locator, 1 member
+   *   2> Site 2 : 1 locator, 1 member
+   *   3> Site 3 : 1 locator, 1 member
+   *   3> DR is defined on  member 1 on site1
+   *   4> Serial GatewaySender is defined on member 1 on site1
+   *   5> Same DR is defined on site2 member 1
+   *   6> Put is done with value which is PDXSerializable
+   *   7> Validate whether other sites member receive this put operation.    
+   */
+  public void testWANPDX_RR_SerialSender3Sites() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+    
+    Integer tkPort = (Integer)vm2.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 3, lnPort });
+
+    vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { lnPort });
+    vm4.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm5.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort });
+
+
+    //Create all of our gateway senders
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ny", 2,
+        false, 100, 10, false, false, null, true });
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "tk", 3,
+      false, 100, 10, false, false, null, true });
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 1,
+      false, 100, 10, false, false, null, true });
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "tk", 3,
+    false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 1,
+      false, 100, 10, false, false, null, true });
+    vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ny", 2,
+    false, 100, 10, false, false, null, true });
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", "ny,tk", isOffHeap() });
+    vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", "ln,tk", isOffHeap() });
+    vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+      testName + "_RR", "ln,ny", isOffHeap() });
+    
+    //Start all of the senders
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ny" });
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "tk" });
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "tk" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ny" });
+    
+    //Pause ln to ny. This means the PDX type will not be dispatched
+    //to ny from ln
+    vm3.invoke(WANTestBase.class, "pauseSender", new Object[] { "ny" });
+    
+    pause(5000);
+    
+    //Do some puts that define a PDX type in ln
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+      1 });
+    
+    //Make sure that tk received the update
+    vm5.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+      testName + "_RR", 1 });
+    
+    //Make ny didn't receive the update because the sender is paused 
+    vm4.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+      testName + "_RR", 0 });
+    
+    //Now, do a put from tk. This serialized object will be distributed
+    //to ny from tk, using the type defined by ln.
+    vm5.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+      2 });
+    
+    //Verify the ny can read the object
+    vm4.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+      testName + "_RR", 2 });
+    
+    //Wait for vm3 to receive the update (prevents a broken pipe suspect 
string) 
+    vm3.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+      testName + "_RR", 2 });
+  }
+  
+  public void testWANPDX_RR_SerialSender_StartedLater() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+        10 });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+      40 });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_RR", 40 });
+  }
+  
+  /**
+   * Test
+   *   1> Site 1 : 1 locator, 1 member
+   *   2> Site 2 : 1 locator, 1 member
+   *   3> PR is defined on  member 1 on site1
+   *   4> Serial GatewaySender is defined on member 1 on site1
+   *   5> Same PR is defined on site2 member 1
+   *   6> Put is done with value which is PDXSerializable
+   *   7> Validate whether other sites member receive this put operation.    
+   */
+  
+  public void testWANPDX_PR_SerialSender() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 2, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 0, 2, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        1 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 1 });
+  }
+  
+  public void testWANPDX_PR_SerialSender_StartedLater() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort 
});
+
+    vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 2, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 0, 2, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        20 });
+    
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+      40 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 40 });
+  }
+  
+  /**
+   * Test
+   *   1> Site 1 : 1 locator, 2 member
+   *   2> Site 2 : 1 locator, 2 member
+   *   3> PR is defined on  member 1, 2 on site1
+   *   4> Serial GatewaySender is defined on member 1 on site1
+   *   5> Same PR is defined on site2 member 1, 2
+   *   6> Put is done with value which is PDXSerializable
+   *   7> Validate whether other sites member receive this put operation.    
+   */
+  
+  public void testWANPDX_PR_MultipleVM_SerialSender() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null,1, 5, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 5, isOffHeap() });
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 1, 5, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        10 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 10 });
+  }
+  
+  public void testWANPDX_PR_MultipleVM_SerialSender_StartedLater() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort 
});
+
+    vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+    vm4.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+    
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 1, 5, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 1, 5, isOffHeap() });
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 1, 5, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        10 });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm4.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+      40 });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 40 });
+  }
+  
+  /**
+   * Test
+   *   1> Site 1 : 1 locator, 1 member
+   *   2> Site 2 : 1 locator, 1 member
+   *   3> PR is defined on  member 1 on site1
+   *   4> Parallel GatewaySender is defined on member 1 on site1
+   *   5> Same PR is defined on site2 member 1
+   *   6> Put is done with value which is PDXSerializable
+   *   7> Validate whether other sites member receive this put operation.    
+   */
+  
+  public void testWANPDX_PR_ParallelSender() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", null, 0, 1, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 0, 1, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        1 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 1});
+  }
+  
+  public void testWANPDX_PR_ParallelSender_47826() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true,
+        100, 10, false, false, null, true });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 0, 1, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "unsetRemoveFromQueueOnException",
+        new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] {
+        testName + "_PR", 1 });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 1, isOffHeap() });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 1 });
+  }
+  
+  public void testWANPDX_PR_ParallelSender_StartedLater() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort 
});
+
+    vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 2, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 0, 2, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        10 });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+      40 });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 40 });
+  }
+  
+  
+  public void testWANPDX_PR_MultipleVM_ParallelSender() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 10, false, false, null, true });
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 2, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 0, 2, isOffHeap() });
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 0, 2, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        10 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 10 });
+  }
+  
+  public void testWANPDX_PR_MultipleVM_ParallelSender_StartedLater() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort 
});
+
+    vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+    vm4.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, null, true });
+   vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 10, false, false, null, true });
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 2, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 0, 2, isOffHeap() });
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 0, 2, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        10 });
+
+    AsyncInvocation inv1 = vm3.invokeAsync(WANTestBase.class, "startSender", 
new Object[] { "ln" });
+    AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "startSender", 
new Object[] { "ln" });
+    
+    try{
+      inv1.join();
+      inv2.join();
+    }
+    catch(InterruptedException ie) {
+      fail("Caught interrupted exception");
+    }
+    
+    vm4.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+      40 });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 40 });
+  }
+  
+  
+  public void testWANPDX_RR_SerialSenderWithFilter() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, new PDXGatewayEventFilter(), true });
+
+    vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", null, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {
+        testName + "_RR", "ln", isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_RR",
+        1 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_RR", 1 });
+    
+    vm3.invoke(PDXNewWanDUnitTest.class, "verifyFilterInvocation", new 
Object[] { 1});
+  }
+  
+  
+  public void testWANPDX_PR_MultipleVM_ParallelSenderWithFilter() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+    vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        true, 100, 10, false, false, new PDXGatewayEventFilter(), true });
+    vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+      true, 100, 10, false, false, new PDXGatewayEventFilter(), true });
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 2, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 0, 2, isOffHeap() });
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", "ln", 0, 2, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        10 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 10 });
+    
+    vm3.invoke(PDXNewWanDUnitTest.class, "verifyFilterInvocation", new 
Object[] { 5});
+    vm4.invoke(PDXNewWanDUnitTest.class, "verifyFilterInvocation", new 
Object[] { 5});
+  }
+  
+  
+  /**
+   * When remote site bounces then we should send pdx event again.
+   */
+  
+  public void Bug_testWANPDX_PR_SerialSender_RemoteSite_Bounce() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+
+    vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort });
+
+    vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2,
+        false, 100, 10, false, false, null, true });
+
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", null, 0, 2, isOffHeap() });
+
+    vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" });
+
+    vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+        testName + "_PR", "ln", 0, 2, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+        1 });
+
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+        testName + "_PR", 1 });
+    
+    vm2.invoke(WANTestBase.class, "killSender", new Object[] {});
+    
+    vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    vm4.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort });
+    
+    vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", null, 1, 2, isOffHeap() });
+    vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {
+      testName + "_PR", null, 1, 2, isOffHeap() });
+    
+    vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { 
testName + "_PR",
+      1 });
+    
+    vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] {
+      testName + "_PR", 1 });
+  }
+  
+
+  
+  
+  public static void verifyFilterInvocation(int invocation) {
+    assertEquals(((PDXGatewayEventFilter)eventFilter).beforeEnqueueInvoked, 
invocation);
+    assertEquals(((PDXGatewayEventFilter)eventFilter).beforeTransmitInvoked, 
invocation);
+    assertEquals(((PDXGatewayEventFilter)eventFilter).afterAckInvoked, 
invocation);
+  }
+
+  
+  
+}

Reply via email to