Cleaning up some duplicate code in AsyncEventListenerDUnitTest This class was now getting a "code to large" complilation error with the lambdas.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/58038ec1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/58038ec1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/58038ec1 Branch: refs/heads/feature/GEODE-866 Commit: 58038ec1ff842128a0337f29d73495188af0fe2b Parents: 51c03a9 Author: Dan Smith <upthewatersp...@apache.org> Authored: Thu Feb 11 16:41:44 2016 -0800 Committer: Dan Smith <upthewatersp...@apache.org> Committed: Tue Feb 16 14:08:40 2016 -0800 ---------------------------------------------------------------------- .../asyncqueue/AsyncEventListenerDUnitTest.java | 446 +++++-------------- 1 file changed, 108 insertions(+), 338 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/58038ec1/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index f5ed32b..8bf2443 100644 --- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -84,10 +84,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testSerialAsyncEventQueueSize() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, false, null, false )); @@ -98,19 +95,9 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); - vm4 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm5 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm6 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm7 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + pauseAsyncEventQueues(); Wait.pause(1000);// pause at least for the batchTimeInterval vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", @@ -121,6 +108,13 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size); assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size); } + + protected void createCaches(Integer lnPort) { + vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + } /** * Added to reproduce defect #50366: @@ -129,20 +123,14 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testConcurrentSerialAsyncEventQueueSize() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY )); vm5.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", false, 100, 150, true, false, null, false, 2, OrderPolicy.KEY )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); vm4 .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); @@ -159,6 +147,13 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm4size); assertEquals("Size of AsyncEventQueue is incorrect", 1000, vm5size); } + + protected void createReplicatedRegionWithAsyncEventQueue() { + vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + } /** * Test configuration:: @@ -170,10 +165,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testReplicatedSerialAsyncEventQueue() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, false, null, false )); @@ -184,18 +176,12 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary + validate1000Puts(); } /** @@ -205,10 +191,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testReplicatedSerialAsyncEventQueueWithCacheLoader() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" )); @@ -248,10 +231,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testReplicatedSerialAsyncEventQueue_ExceptionScenario() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueueWithCustomListener( "ln", false, 100, 100, false, false, null, false, 1 )); @@ -262,19 +242,9 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueueWithCustomListener( "ln", false, 100, 100, false, false, null, false, 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); - vm4 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm5 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm6 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm7 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + pauseAsyncEventQueues(); Wait.pause(2000);// pause at least for the batchTimeInterval vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", @@ -300,10 +270,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testReplicatedSerialAsyncEventQueueWithConflationEnabled() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, true, false, null, false )); @@ -314,19 +281,9 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, true, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); - vm4 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm5 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm6 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm7 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + pauseAsyncEventQueues(); Wait.pause(1000);// pause at least for the batchTimeInterval final Map keyValues = new HashMap(); @@ -367,10 +324,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm6.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" )); vm7.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary + validate1000Puts(); } @@ -400,18 +354,12 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary + validate1000Puts(); } /** @@ -426,10 +374,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testReplicatedSerialAsyncEventQueueWithPeristenceEnabled() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, true, false, null, false )); @@ -440,13 +385,14 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, true, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); + validate1000Puts(); + } + + protected void validate1000Puts() { vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 1000 ));// primary sender vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary @@ -466,10 +412,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void DISABLED_testReplicatedSerialAsyncEventQueueWithPeristenceEnabled_Restart() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); String firstDStore = (String)vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueueWithDiskStore( "ln", false, 100, 100, true, null )); @@ -549,10 +492,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY )); @@ -563,17 +503,11 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", false, 100, 100, true, false, null, false, 3, OrderPolicy.KEY )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary + validate1000Puts(); } /** @@ -587,10 +521,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testConcurrentSerialAsyncEventQueueWithReplicatedRegion_2() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD )); @@ -601,10 +532,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", false, 100, 100, true, false, null, false, 3, OrderPolicy.THREAD )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); vm4.invokeAsync(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", 500 )); @@ -614,10 +542,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { // vm4.invokeAsync(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", // 1000, 1500 )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary + validate1000Puts(); } /** @@ -627,10 +552,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testConcurrentSerialAsyncEventQueueWithoutOrderPolicy() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", false, 100, 100, true, false, null, false, 3, null )); @@ -641,17 +563,11 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", false, 100, 100, true, false, null, false, 3, null )); - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); + createReplicatedRegionWithAsyncEventQueue(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", 1000 )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener("ln", 0 ));// secondary + validate1000Puts(); } /** @@ -663,10 +579,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testPartitionedSerialAsyncEventQueue() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, false, null, false )); @@ -677,19 +590,20 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR", 500 )); vm5.invoke(() -> AsyncEventQueueTestBase.doPutsFrom( getTestMethodName() + "_PR", 500, 1000 )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary + validate1000Puts(); + } + + protected void createPartitionedRegions() { + vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); } /** @@ -701,10 +615,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testPartitionedSerialAsyncEventQueueWithConflationEnabled() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, true, false, null, false )); @@ -715,19 +626,9 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, true, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); - vm4 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm5 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm6 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm7 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + pauseAsyncEventQueues(); Wait.pause(2000); @@ -768,10 +669,18 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm6.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" )); vm7.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary + validate1000Puts(); + } + + protected void pauseAsyncEventQueues() { + vm4 + .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + vm5 + .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + vm6 + .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + vm7 + .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); } /** @@ -785,10 +694,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, true, null, false )); @@ -799,19 +705,13 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", false, 100, 100, false, true, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR", 500 )); vm5.invoke(() -> AsyncEventQueueTestBase.doPutsFrom( getTestMethodName() + "_PR", 500, 1000 )); - vm4.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 1000 ));// primary sender - vm5.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm6.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary - vm7.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 ));// secondary + validate1000Puts(); } /** @@ -825,10 +725,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testPartitionedSerialAsyncEventQueueWithPeristenceEnabled_Restart() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); String firstDStore = (String)vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueueWithDiskStore( "ln", false, 100, 100, true, null )); @@ -858,10 +755,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { try { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, true, false, null, false )); @@ -887,10 +781,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testParallelAsyncEventQueue() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, false, false, null, false )); @@ -901,18 +792,12 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, false, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR", 256 )); - vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + waitForAsyncQueueToGetEmpty(); int vm4size = (Integer)vm4.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); int vm5size = (Integer)vm5.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); @@ -929,10 +814,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testParallelAsyncEventQueueWithCacheLoader() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, false, false, null, false, "MyAsyncEventListener_CacheLoader" )); @@ -959,10 +841,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testParallelAsyncEventQueueSize() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, false, false, null, false )); @@ -973,19 +852,9 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, false, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); - vm4 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm5 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm6 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm7 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + pauseAsyncEventQueues(); Wait.pause(1000);// pause at least for the batchTimeInterval vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR", @@ -1005,10 +874,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testConcurrentParallelAsyncEventQueueSize() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY )); @@ -1019,19 +885,9 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createConcurrentAsyncEventQueue( "ln", true, 100, 100, false, false, null, false, 2, OrderPolicy.KEY )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); - vm4 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm5 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm6 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm7 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + pauseAsyncEventQueues(); Wait.pause(1000);// pause at least for the batchTimeInterval vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR", @@ -1047,10 +903,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testParallelAsyncEventQueueWithConflationEnabled() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, true, false, null, false )); @@ -1061,19 +914,9 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, true, false, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); - vm4 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm5 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm6 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm7 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + pauseAsyncEventQueues(); Wait.pause(2000);// pause for the batchTimeInterval to ensure that all the // senders are paused @@ -1112,10 +955,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm6.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" )); vm7.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" )); - vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + waitForAsyncQueueToGetEmpty(); int vm4size = (Integer)vm4.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); int vm5size = (Integer)vm5.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); @@ -1131,10 +971,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { public void testParallelAsyncEventQueueWithConflationEnabled_bug47213() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, true, false, null, false )); @@ -1150,14 +987,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm6.invoke(() -> AsyncEventQueueTestBase.createPRWithRedundantCopyWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); vm7.invoke(() -> AsyncEventQueueTestBase.createPRWithRedundantCopyWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm4 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm5 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm6 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); - vm7 - .invoke(() -> AsyncEventQueueTestBase.pauseAsyncEventQueue( "ln" )); + pauseAsyncEventQueues(); Wait.pause(2000);// pause for the batchTimeInterval to ensure that all the // senders are paused @@ -1195,10 +1025,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm6.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" )); vm7.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue( "ln" )); - vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + waitForAsyncQueueToGetEmpty(); int vm4size = (Integer)vm4.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); int vm5size = (Integer)vm5.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); @@ -1213,10 +1040,7 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); vm3.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm3.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, false, false, null, false )); @@ -1231,18 +1055,12 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm3.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionAccessorWithAsyncEventQueue( getTestMethodName() + "_PR", "ln" )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); vm3.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR", 256 )); - vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + waitForAsyncQueueToGetEmpty(); vm3.invoke(() -> AsyncEventQueueTestBase.validateAsyncEventListener( "ln", 0 )); @@ -1255,13 +1073,17 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { } + protected void waitForAsyncQueueToGetEmpty() { + vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + } + public void testParallelAsyncEventQueueWithPersistence() { Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); + createCaches(lnPort); vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, false, true, null, false )); @@ -1272,18 +1094,12 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", true, 100, 100, false, true, null, false )); - vm4.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createPartitionedRegionWithAsyncEventQueue( getTestMethodName() + "_PR", "ln", isOffHeap() )); + createPartitionedRegions(); vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_PR", 256 )); - vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); + waitForAsyncQueueToGetEmpty(); int vm4size = (Integer)vm4.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); int vm5size = (Integer)vm5.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln")); @@ -1293,52 +1109,6 @@ public class AsyncEventListenerDUnitTest extends AsyncEventQueueTestBase { assertEquals(vm4size + vm5size + vm6size + vm7size, 256); } - /** - * Below test is disabled intentionally Replicated region with Parallel Async - * Event queue is not supported. Test is added for the same - * testParallelAsyncEventQueueWithReplicatedRegion - * - * We are gone support this configuration in upcoming releases - */ - - public void DISABLED_DUETO_BUG51491_testReplicatedParallelAsyncEventQueue() { - Integer lnPort = (Integer)vm0.invoke(() -> AsyncEventQueueTestBase.createFirstLocatorWithDSId( 1 )); - - vm4.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm5.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm6.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - vm7.invoke(() -> AsyncEventQueueTestBase.createCache( lnPort )); - - vm4.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", - true, 100, 100, false, false, null, false )); - vm5.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", - true, 100, 100, false, false, null, false )); - vm6.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", - true, 100, 100, false, false, null, false )); - vm7.invoke(() -> AsyncEventQueueTestBase.createAsyncEventQueue( "ln", - true, 100, 100, false, false, null, false )); - - vm4.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm5.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm6.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - vm7.invoke(() -> AsyncEventQueueTestBase.createReplicatedRegionWithAsyncEventQueue( getTestMethodName() + "_RR", "ln", isOffHeap() )); - - vm4.invoke(() -> AsyncEventQueueTestBase.doPuts( getTestMethodName() + "_RR", - 1000 )); - - vm4.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm5.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm6.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - vm7.invoke(() -> AsyncEventQueueTestBase.waitForAsyncQueueToGetEmpty( "ln" )); - - int vm4size = (Integer)vm4.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" )); - int vm5size = (Integer)vm5.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" )); - int vm6size = (Integer)vm6.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" )); - int vm7size = (Integer)vm7.invoke(() -> AsyncEventQueueTestBase.getAsyncEventListenerMapSize( "ln" )); - - assertEquals(vm4size + vm5size + vm6size + vm7size, 1000); - } - /** * Test case to test possibleDuplicates. vm4 & vm5 are hosting the PR. vm5 is * killed so the buckets hosted by it are shifted to vm4.