http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java new file mode 100644 index 0000000..fe21691 --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/CommonParallelGatewaySenderOffHeapDUnitTest.java @@ -0,0 +1,16 @@ +package com.gemstone.gemfire.internal.cache.wan.misc; + +@SuppressWarnings("serial") +public class CommonParallelGatewaySenderOffHeapDUnitTest extends + CommonParallelGatewaySenderDUnitTest { + + public CommonParallelGatewaySenderOffHeapDUnitTest(String name) { + super(name); + } + + @Override + public boolean isOffHeap() { + return true; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java new file mode 100644 index 0000000..4cff68d --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/NewWANConcurrencyCheckForDestroyDUnitTest.java @@ -0,0 +1,521 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +/** + * + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import java.util.HashMap; +import java.util.Map; + +import com.gemstone.gemfire.cache.CacheException; +import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache30.CacheSerializableRunnable; +import com.gemstone.gemfire.internal.cache.DistributedCacheOperation; +import com.gemstone.gemfire.internal.cache.EntrySnapshot; +import com.gemstone.gemfire.internal.cache.LocalRegion; +import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry; +import com.gemstone.gemfire.internal.cache.RegionEntry; +import com.gemstone.gemfire.internal.cache.Token.Tombstone; +import com.gemstone.gemfire.internal.cache.versions.VersionTag; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; + +import dunit.AsyncInvocation; + +/** + * @author shobhit + * + * Test verifies that version tag for destroyed entry is propagated back to + * origin distributed system if the version tag is applied and replaces old + * version information in destination distributed system. + * + * Version tag information which is relevant between multiple distributed + * systems consistency check is basically dsid and timestamp. + */ +public class NewWANConcurrencyCheckForDestroyDUnitTest extends WANTestBase { + + //These fields are used as BlackBoard for test data verification. + static long destroyTimeStamp; + static int destroyingMember; + + public NewWANConcurrencyCheckForDestroyDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + } + + public void testVersionTagTimestampForDestroy() { + + + // create three distributed systems with each having a cache containing + // a Replicated Region with one entry and concurrency checks enabled. + + // Site 2 and Site 3 only know about Site 1 but Site 1 knows about both + // Site 2 and Site 3. + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer lnRecPort = (Integer) vm1.invoke(WANTestBase.class, "createReceiver", new Object[] { lnPort }); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer nyRecPort = (Integer) vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + //Site 3 + Integer tkPort = (Integer)vm4.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 3, lnPort }); + Integer tkRecPort = (Integer) vm5.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort }); + + getLogWriter().info("Created locators and receivers in 3 distributed systems"); + + //Site 1 + vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + true, 10, 1, false, false, null, true }); + vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln2", 3, + true, 10, 1, false, false, null, true }); + + vm1.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {"repRegion", "ln1,ln2", 0, 1, isOffHeap() }); + vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); + vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln2" }); + vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln1" }); + vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln2" }); + + //Site 2 + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ny1", 1, + true, 10, 1, false, false, null, true }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {"repRegion", "ny1", 0, 1, isOffHeap() }); + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ny1" }); + vm3.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ny1" }); + + //Site 3 which only knows about Site 1. + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "tk1", 1, + true, 10, 1, false, false, null, true }); + + vm5.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {"repRegion", "tk1", 0, 1, isOffHeap() }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "tk1" }); + vm5.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "tk1" }); + + pause(2000); + + // Perform a put in vm1 + vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + + Region region = cache.getRegion("/repRegion"); + region.put("testKey", "testValue"); + + assertEquals(1, region.size()); + } + }); + + //wait for vm1 to propagate put to vm3 and vm5 + pause(2000); + + destroyTimeStamp = (Long) vm3.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, "getVersionTimestampAfterOp"); + + //wait for vm1 to propagate destroyed entry's new version tag to vm5 + pause(2000); + + vm5.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, "verifyTimestampAfterOp", + new Object[] {destroyTimeStamp, 1 /* ds 3 receives gatway event only from ds 1*/}); + } + + /** + * Test creates two sites and one Replicated Region on each with Serial + * GatewaySender on each. Test checks for sequence of events being sent from + * site1 to site2 for PUTALL and PUT and finally checks for final timestamp in + * version for RegionEntry with key "testKey". If timestamp on both site is + * same that means events were transferred in correct sequence. + */ + public void testPutAllEventSequenceOnSerialGatewaySenderWithRR() { + + // create two distributed systems with each having a cache containing + // a Replicated Region with one entry and concurrency checks enabled. + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer lnRecPort = (Integer) vm1.invoke(WANTestBase.class, "createReceiver", new Object[] { lnPort }); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer nyRecPort = (Integer) vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + getLogWriter().info("Created locators and receivers in 2 distributed systems"); + + //Site 1 + vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + false, 10, 1, false, false, null, true }); + + vm1.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {"repRegion", "ln1", isOffHeap() }); + vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); + vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln1" }); + + //Site 2 + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ny1", 1, + false, 10, 1, false, false, null, true }); + + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {"repRegion", "ny1", isOffHeap() }); + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ny1" }); + vm3.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ny1" }); + + pause(2000); + + // Perform a put in vm1 + AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + // Test hook to make put wait after RE lock is released but before Gateway events are sent. + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; + + Region region = cache.getRegion("/repRegion"); + Map testMap = new HashMap(); + testMap.put("testKey", "testValue1"); + region.putAll(testMap); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + //wait for vm1 to propagate put to vm3 + pause(1000); + + AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + Region region = cache.getRegion("/repRegion"); + + while (!region.containsKey("testKey")) { + pause(10); + } + // Test hook to make put wait after RE lock is released but before Gateway events are sent. + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; + + region.put("testKey", "testValue2"); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + try { + asynch1.join(5000); + asynch2.join(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") { + + @Override + public void run2() throws CacheException { + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; + } + }); + } + + //Wait for all Gateway events be received by vm3. + pause(1000); + + long putAllTimeStampVm1 = (Long) vm1.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, "getVersionTimestampAfterPutAllOp"); + + long putAllTimeStampVm3 = (Long) vm3.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, "getVersionTimestampAfterPutAllOp"); + + assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); + } + +/** + * This is similar to above test but for PartitionedRegion. + */ +public void testPutAllEventSequenceOnSerialGatewaySenderWithPR() { + + // create two distributed systems with each having a cache containing + // a Replicated Region with one entry and concurrency checks enabled. + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer lnRecPort = (Integer) vm1.invoke(WANTestBase.class, "createReceiver", new Object[] { lnPort }); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer nyRecPort = (Integer) vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + getLogWriter().info("Created locators and receivers in 2 distributed systems"); + + //Site 1 + vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + false, 10, 1, false, false, null, true }); + + vm1.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {"repRegion", "ln1", 0, 1, isOffHeap() }); + vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); + vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln1" }); + + //Site 2 + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ny1", 1, + false, 10, 1, false, false, null, true }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] {"repRegion", "ny1", 0, 1, isOffHeap() }); + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ny1" }); + vm3.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ny1" }); + + pause(2000); + + // Perform a put in vm1 + AsyncInvocation asynch1 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + // Test hook to make put wait after RE lock is released but before Gateway events are sent. + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 2000; + + Region region = cache.getRegion("/repRegion"); + Map testMap = new HashMap(); + testMap.put("testKey", "testValue1"); + region.putAll(testMap); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + //wait for vm1 to propagate put to vm3 + pause(1000); + + AsyncInvocation asynch2 = vm1.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + Region region = cache.getRegion("/repRegion"); + + while (!region.containsKey("testKey")) { + pause(10); + } + // Test hook to make put wait after RE lock is released but before Gateway events are sent. + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; + + region.put("testKey", "testValue2"); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + try { + asynch1.join(5000); + asynch2.join(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + vm1.invoke(new CacheSerializableRunnable("Reset Test Hook") { + + @Override + public void run2() throws CacheException { + DistributedCacheOperation.SLOW_DISTRIBUTION_MS = 0; + } + }); + } + + //Wait for all Gateway events be received by vm3. + pause(1000); + + long putAllTimeStampVm1 = (Long) vm1.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, "getVersionTimestampAfterPutAllOp"); + + long putAllTimeStampVm3 = (Long) vm3.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, "getVersionTimestampAfterPutAllOp"); + + assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); + } + + /** + * Tests if conflict checks are happening based on DSID and timestamp even if + * version tag is generated in local distributed system. + */ + public void testConflicChecksBasedOnDsidAndTimeStamp() { + + + // create two distributed systems with each having a cache containing + // a Replicated Region with one entry and concurrency checks enabled. + + // Site 1 + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer lnRecPort = (Integer) vm1.invoke(WANTestBase.class, "createReceiver", new Object[] { lnPort }); + + //Site 2 + Integer nyPort = (Integer)vm2.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + Integer nyRecPort = (Integer) vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + getLogWriter().info("Created locators and receivers in 2 distributed systems"); + + //Site 1 + vm1.invoke(WANTestBase.class, "createSender", new Object[] { "ln1", 2, + false, 10, 1, false, false, null, true }); + + vm1.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {"repRegion", "ln1", isOffHeap() }); + vm1.invoke(WANTestBase.class, "startSender", new Object[] { "ln1" }); + vm1.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ln1" }); + + //Site 2 + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {"repRegion", "ny1", isOffHeap() }); + + vm4.invoke(WANTestBase.class, "createCache", new Object[] { nyPort }); + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ny1", 1, + false, 10, 1, false, false, null, true }); + + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] {"repRegion", "ny1", isOffHeap() }); + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ny1" }); + vm4.invoke(WANTestBase.class, "waitForSenderRunningState", new Object[] { "ny1" }); + + pause(2000); + + // Perform a put in vm1 + vm1.invoke(new CacheSerializableRunnable("Putting an entry in ds1") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + + Region region = cache.getRegion("/repRegion"); + region.put("testKey", "testValue1"); + + assertEquals(1, region.size()); + } + }); + + //wait for vm4 to have later timestamp before sending operation to vm1 + pause(300); + + AsyncInvocation asynch = vm4.invokeAsync(new CacheSerializableRunnable("Putting an entry in ds2 in vm4") { + + @Override + public void run2() throws CacheException { + assertNotNull(cache); + Region region = cache.getRegion("/repRegion"); + + region.put("testKey", "testValue2"); + + assertEquals(1, region.size()); + assertEquals("testValue2", region.get("testKey")); + } + }); + + try { + asynch.join(2000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + //Wait for all local ds events be received by vm3. + pause(1000); + + vm3.invoke(new CacheSerializableRunnable("Check dsid") { + + @Override + public void run2() throws CacheException { + Region region = cache.getRegion("repRegion"); + + Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ + true); //commented while merging revision 43582 + RegionEntry re = null; + if (entry instanceof EntrySnapshot) { + re = ((EntrySnapshot)entry).getRegionEntry(); + } else if (entry instanceof NonTXEntry) { + re = ((NonTXEntry)entry).getRegionEntry(); + } + VersionTag tag = re.getVersionStamp().asVersionTag(); + assertEquals(2, tag.getDistributedSystemId()); + } + }); + + // Check vm3 has latest timestamp from vm4. + long putAllTimeStampVm1 = (Long) vm4.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, "getVersionTimestampAfterPutAllOp"); + + long putAllTimeStampVm3 = (Long) vm3.invoke(NewWANConcurrencyCheckForDestroyDUnitTest.class, "getVersionTimestampAfterPutAllOp"); + + assertEquals(putAllTimeStampVm1, putAllTimeStampVm3); + } + + /* + * For VM1 in ds 1. Used in testPutAllEventSequenceOnSerialGatewaySender. + */ + public static long getVersionTimestampAfterPutAllOp() { + Region region = cache.getRegion("repRegion"); + + while (!(region.containsKey("testKey") /*&& region.get("testKey").equals("testValue2") */)) { + pause(10); + } + assertEquals(1, region.size()); + + Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); + RegionEntry re = null; + if (entry instanceof EntrySnapshot) { + re = ((EntrySnapshot)entry).getRegionEntry(); + } else if (entry instanceof NonTXEntry) { + re = ((NonTXEntry)entry).getRegionEntry(); + } + if (re != null) { + getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region)); + + VersionTag tag = re.getVersionStamp().asVersionTag(); + return tag.getVersionTimeStamp(); + } else { + return -1; + } + } + + /* + * For VM3 in ds 2. + */ + public static long getVersionTimestampAfterOp() { + Region region = cache.getRegion("repRegion"); + assertEquals(1, region.size()); + + region.destroy("testKey"); + + Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); + RegionEntry re = ((EntrySnapshot)entry).getRegionEntry(); + getLogWriter().fine("RegionEntry for testKey: " + re.getKey() + " " + re.getValueInVM((LocalRegion) region)); + assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone); + + VersionTag tag = re.getVersionStamp().asVersionTag(); + return tag.getVersionTimeStamp(); + } + + /* + * For VM 5 in ds 3. + */ + public static void verifyTimestampAfterOp(long timestamp, int memberid) { + Region region = cache.getRegion("repRegion"); + assertEquals(0, region.size()); + + Region.Entry entry = ((LocalRegion)region).getEntry("testKey", /*null,*/ true); + RegionEntry re = ((EntrySnapshot)entry).getRegionEntry(); + assertTrue(re.getValueInVM((LocalRegion) region) instanceof Tombstone); + + VersionTag tag = re.getVersionStamp().asVersionTag(); + assertEquals(timestamp, tag.getVersionTimeStamp()); + assertEquals(memberid, tag.getDistributedSystemId()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java new file mode 100644 index 0000000..cf78e24 --- /dev/null +++ b/gemfire-wan/src/test/java/com/gemstone/gemfire/internal/cache/wan/misc/PDXNewWanDUnitTest.java @@ -0,0 +1,779 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache.wan.misc; + +import com.gemstone.gemfire.cache.CacheWriterException; +import com.gemstone.gemfire.cache.client.ServerOperationException; +import com.gemstone.gemfire.internal.cache.wan.WANTestBase; + +import dunit.AsyncInvocation; + +public class PDXNewWanDUnitTest extends WANTestBase{ + + private static final long serialVersionUID = 1L; + + public PDXNewWanDUnitTest(String name) { + super(name); + } + + public void setUp() throws Exception { + super.setUp(); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> DR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same DR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + public void testWANPDX_RR_SerialSender() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 1 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 1 }); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> DR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same DR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + * 8> Bounce site 1 and delete all of it's data + * 9> Make sure that site 1 get the the PDX types along with entries + * and can deserialize entries. + */ + public void testWANPDX_RemoveRomoteData() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 1 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 1 }); + + + //bounce vm2 + vm2.invoke(WANTestBase.class, "closeCache"); + + vm2.invoke(WANTestBase.class, "deletePDXDir"); + + vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 2 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 2 }); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> DR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same DR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + * 8> Bounce site 1 and delete all of it's data + * 9> Make some conflicting PDX registries in site 1 before the reconnect + * 10> Make sure we flag a warning about the conflicting updates. + */ + public void testWANPDX_ConflictingData() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 1 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 1 }); + + //bounce vm3 + vm3.invoke(WANTestBase.class, "closeCache"); + + ExpectedException ex1 = addExpectedException("Trying to add a PDXType with the same id"); + ExpectedException ex2 = addExpectedException("CacheWriterException"); + ExpectedException ex3 = addExpectedException("does match the existing PDX type"); + ExpectedException ex4 = addExpectedException("ServerOperationException"); + ExpectedException ex5 = addExpectedException("Stopping the processor"); + + try { + //blow away vm3's PDX data + vm3.invoke(WANTestBase.class, "deletePDXDir"); + + vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + //Define a different type from vm3 + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable2", new Object[] { testName + "_RR", + 2 }); + + //Give the updates some time to make it over the WAN + pause(10000); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 1 }); + + vm3.invoke(WANTestBase.class, "closeCache"); + } finally { + ex1.remove(); + ex2.remove(); + ex3.remove(); + ex4.remove(); + ex5.remove(); + } + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> Site 3 : 1 locator, 1 member + * 3> DR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same DR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + public void testWANPDX_RR_SerialSender3Sites() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + Integer tkPort = (Integer)vm2.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 3, lnPort }); + + vm3.invoke(WANTestBase.class, "createReceiver", new Object[] { lnPort }); + vm4.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm5.invoke(WANTestBase.class, "createReceiver", new Object[] { tkPort }); + + + //Create all of our gateway senders + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ny", 2, + false, 100, 10, false, false, null, true }); + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "tk", 3, + false, 100, 10, false, false, null, true }); + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 1, + false, 100, 10, false, false, null, true }); + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "tk", 3, + false, 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 1, + false, 100, 10, false, false, null, true }); + vm5.invoke(WANTestBase.class, "createSender", new Object[] { "ny", 2, + false, 100, 10, false, false, null, true }); + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ny,tk", isOffHeap() }); + vm4.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln,tk", isOffHeap() }); + vm5.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln,ny", isOffHeap() }); + + //Start all of the senders + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ny" }); + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "tk" }); + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "tk" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm5.invoke(WANTestBase.class, "startSender", new Object[] { "ny" }); + + //Pause ln to ny. This means the PDX type will not be dispatched + //to ny from ln + vm3.invoke(WANTestBase.class, "pauseSender", new Object[] { "ny" }); + + pause(5000); + + //Do some puts that define a PDX type in ln + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 1 }); + + //Make sure that tk received the update + vm5.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 1 }); + + //Make ny didn't receive the update because the sender is paused + vm4.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 0 }); + + //Now, do a put from tk. This serialized object will be distributed + //to ny from tk, using the type defined by ln. + vm5.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 2 }); + + //Verify the ny can read the object + vm4.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 2 }); + + //Wait for vm3 to receive the update (prevents a broken pipe suspect string) + vm3.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 2 }); + } + + public void testWANPDX_RR_SerialSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 10 }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 40 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 40 }); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> PR is defined on member 1 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same PR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + + public void testWANPDX_PR_SerialSender() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 1 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 1 }); + } + + public void testWANPDX_PR_SerialSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 20 }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 40 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 40 }); + } + + /** + * Test + * 1> Site 1 : 1 locator, 2 member + * 2> Site 2 : 1 locator, 2 member + * 3> PR is defined on member 1, 2 on site1 + * 4> Serial GatewaySender is defined on member 1 on site1 + * 5> Same PR is defined on site2 member 1, 2 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + + public void testWANPDX_PR_MultipleVM_SerialSender() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null,1, 5, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 5, isOffHeap() }); + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 5, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 10 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 10 }); + } + + public void testWANPDX_PR_MultipleVM_SerialSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + vm4.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 5, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 5, isOffHeap() }); + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 1, 5, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 10 }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm4.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 40 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 40 }); + } + + /** + * Test + * 1> Site 1 : 1 locator, 1 member + * 2> Site 2 : 1 locator, 1 member + * 3> PR is defined on member 1 on site1 + * 4> Parallel GatewaySender is defined on member 1 on site1 + * 5> Same PR is defined on site2 member 1 + * 6> Put is done with value which is PDXSerializable + * 7> Validate whether other sites member receive this put operation. + */ + + public void testWANPDX_PR_ParallelSender() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 1, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 1, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 1 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 1}); + } + + public void testWANPDX_PR_ParallelSender_47826() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, true, + 100, 10, false, false, null, true }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 1, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "unsetRemoveFromQueueOnException", + new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { + testName + "_PR", 1 }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 1, isOffHeap() }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 1 }); + } + + public void testWANPDX_PR_ParallelSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 10 }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 40 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 40 }); + } + + + public void testWANPDX_PR_MultipleVM_ParallelSender() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 10 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 10 }); + } + + public void testWANPDX_PR_MultipleVM_ParallelSender_StartedLater() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver_PDX", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + vm4.invoke(WANTestBase.class, "createCache_PDX", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 10 }); + + AsyncInvocation inv1 = vm3.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" }); + AsyncInvocation inv2 = vm4.invokeAsync(WANTestBase.class, "startSender", new Object[] { "ln" }); + + try{ + inv1.join(); + inv2.join(); + } + catch(InterruptedException ie) { + fail("Caught interrupted exception"); + } + + vm4.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 40 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 40 }); + } + + + public void testWANPDX_RR_SerialSenderWithFilter() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, new PDXGatewayEventFilter(), true }); + + vm2.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", null, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "createReplicatedRegion", new Object[] { + testName + "_RR", "ln", isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_RR", + 1 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_RR", 1 }); + + vm3.invoke(PDXNewWanDUnitTest.class, "verifyFilterInvocation", new Object[] { 1}); + } + + + public void testWANPDX_PR_MultipleVM_ParallelSenderWithFilter() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + vm4.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, new PDXGatewayEventFilter(), true }); + vm4.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + true, 100, 10, false, false, new PDXGatewayEventFilter(), true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + vm4.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 10 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 10 }); + + vm3.invoke(PDXNewWanDUnitTest.class, "verifyFilterInvocation", new Object[] { 5}); + vm4.invoke(PDXNewWanDUnitTest.class, "verifyFilterInvocation", new Object[] { 5}); + } + + + /** + * When remote site bounces then we should send pdx event again. + */ + + public void Bug_testWANPDX_PR_SerialSender_RemoteSite_Bounce() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm3.invoke(WANTestBase.class, "createCache", new Object[] { lnPort }); + + vm3.invoke(WANTestBase.class, "createSender", new Object[] { "ln", 2, + false, 100, 10, false, false, null, true }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "startSender", new Object[] { "ln" }); + + vm3.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", "ln", 0, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 1 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 1 }); + + vm2.invoke(WANTestBase.class, "killSender", new Object[] {}); + + vm2.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + vm4.invoke(WANTestBase.class, "createReceiver", new Object[] { nyPort }); + + vm2.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 2, isOffHeap() }); + vm4.invoke(WANTestBase.class, "createPartitionedRegion", new Object[] { + testName + "_PR", null, 1, 2, isOffHeap() }); + + vm3.invoke(WANTestBase.class, "doPutsPDXSerializable", new Object[] { testName + "_PR", + 1 }); + + vm2.invoke(WANTestBase.class, "validateRegionSize_PDX", new Object[] { + testName + "_PR", 1 }); + } + + + + + public static void verifyFilterInvocation(int invocation) { + assertEquals(((PDXGatewayEventFilter)eventFilter).beforeEnqueueInvoked, invocation); + assertEquals(((PDXGatewayEventFilter)eventFilter).beforeTransmitInvoked, invocation); + assertEquals(((PDXGatewayEventFilter)eventFilter).afterAckInvoked, invocation); + } + + + +}