This is an automated email from the ASF dual-hosted git repository. jinmeiliao pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit e3ef06166bea60459ec2ae33beca2e547af4224c Author: Kirk Lund <kl...@apache.org> AuthorDate: Mon Apr 19 16:16:17 2021 -0700 GEODE-9132: PartitionedRegionClearWithConcurrentOperationsDUnitTest cleanup 2 --- ...gionClearWithConcurrentOperationsDUnitTest.java | 782 ++++++++++----------- 1 file changed, 378 insertions(+), 404 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java index c9a1e5b..b2aacc0 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java @@ -72,8 +72,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements private static final int BUCKETS = 13; private static final String REGION_NAME = "PartitionedRegion"; - private static final String TEST_CASE_NAME = - "[{index}] {method}(Coordinator:{0}, RegionType:{1})"; @Rule public DistributedRule distributedRule = new DistributedRule(3); @@ -84,32 +82,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements private VM server2; private VM accessor; - @SuppressWarnings("unused") - static TestVM[] coordinators() { - return new TestVM[] { - TestVM.SERVER1, TestVM.ACCESSOR - }; - } - - @SuppressWarnings("unused") - static Object[] coordinatorsAndRegionTypes() { - List<Object[]> parameters = new ArrayList<>(); - RegionShortcut[] regionShortcuts = regionTypes(); - - Arrays.stream(regionShortcuts).forEach(regionShortcut -> { - parameters.add(new Object[] {TestVM.SERVER1, regionShortcut}); - parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut}); - }); - - return parameters.toArray(); - } - - private static RegionShortcut[] regionTypes() { - return new RegionShortcut[] { - RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT - }; - } - @Before public void setUp() throws Exception { server1 = getVM(TestVM.SERVER1.vmNumber); @@ -117,127 +89,414 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements accessor = getVM(TestVM.ACCESSOR.vmNumber); } - private void initAccessor(RegionShortcut regionShortcut) { - PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>() - .setTotalNumBuckets(BUCKETS) - .setLocalMaxMemory(0) - .create(); + /** + * The test does the following (clear coordinator and regionType are parametrized): + * - Launches one thread per VM to continuously execute removes, puts and gets for a given time. + * - Clears the Partition Region continuously every X milliseconds for a given time. + * - Asserts that, after the clears have finished, the Region Buckets are consistent across + * members. + */ + @Test + @Parameters({"SERVER1,PARTITION", "ACCESSOR,PARTITION", + "SERVER1,PARTITION_REDUNDANT", "ACCESSOR,PARTITION_REDUNDANT"}) + @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})") + public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM, + RegionShortcut regionShortcut) throws InterruptedException { + parametrizedSetup(regionShortcut); - cacheRule.getCache().createRegionFactory(regionShortcut) - .setPartitionAttributes(attrs) - .create(REGION_NAME); + // Let all VMs continuously execute puts and gets for 60 seconds. + final int workMillis = 60000; + final int entries = 15000; + List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( + server1.invokeAsync(() -> executePuts(entries, workMillis)), + server2.invokeAsync(() -> executeGets(entries, workMillis)), + accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); - } + // Clear the region every second for 60 seconds. + getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000)); - private void initDataStore(RegionShortcut regionShortcut) { - PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>() - .setTotalNumBuckets(BUCKETS) - .create(); + // Let asyncInvocations finish. + for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { + asyncInvocation.await(); + } - cacheRule.getCache().createRegionFactory(regionShortcut) - .setPartitionAttributes(attrs) - .create(REGION_NAME); + // Assert Region Buckets are consistent. + asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); + accessor.invoke(this::assertRegionBucketsConsistency); } - private void parametrizedSetup(RegionShortcut regionShortcut) { - server1.invoke(() -> initDataStore(regionShortcut)); - server2.invoke(() -> initDataStore(regionShortcut)); - accessor.invoke(() -> initAccessor(regionShortcut)); - } + /** + * The test does the following (clear coordinator and regionType are parametrized): + * - Launches two threads per VM to continuously execute putAll and removeAll for a given time. + * - Clears the Partition Region continuously every X milliseconds for a given time. + * - Asserts that, after the clears have finished, the Region Buckets are consistent across + * members. + */ + @Test + @Parameters({"SERVER1,PARTITION", "ACCESSOR,PARTITION", + "SERVER1,PARTITION_REDUNDANT", "ACCESSOR,PARTITION_REDUNDANT"}) + @TestCaseName("[{index}] {method}(Coordinator:{0}, RegionType:{1})") + public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM, + RegionShortcut regionShortcut) throws InterruptedException { + parametrizedSetup(regionShortcut); - private void waitForSilence() { - DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats(); - PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); - PartitionedRegionStats partitionedRegionStats = region.getPrStats(); + // Let all VMs continuously execute putAll and removeAll for 15 seconds. + final int workMillis = 15000; + List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( + server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)), + server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)), + server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)), + server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)), + accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)), + accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis))); - await().untilAsserted(() -> { - assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0); - assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0); - assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0); - assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0); - assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0); - assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0); - }); + // Clear the region every half second for 15 seconds. + getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500)); + + // Let asyncInvocations finish. + for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { + asyncInvocation.await(); + } + + // Assert Region Buckets are consistent. + asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); + accessor.invoke(this::assertRegionBucketsConsistency); } /** - * Populates the region and verifies the data on the selected VMs. + * The test does the following (regionType is parametrized): + * - Populates the Partition Region. + * - Verifies that the entries are synchronized on all members. + * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop the + * coordinator VM while the clear is in progress. + * - Clears the Partition Region (at this point the coordinator is restarted). + * - Asserts that, after the member joins again, the Region Buckets are consistent. */ - private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) { - feeder.invoke(() -> { + @Test + @Parameters({"PARTITION", "PARTITION_REDUNDANT"}) + @TestCaseName("[{index}] {method}(RegionType:{0})") + public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) { + parametrizedSetup(regionShortcut); + final int entries = 1000; + populateRegion(accessor, entries, asList(accessor, server1, server2)); + + // Set the CoordinatorMemberKiller and try to clear the region + server1.invoke(() -> { + DistributionMessageObserver.setInstance(new MemberKiller(true)); Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); - IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i)); + assertThatThrownBy(region::clear) + .isInstanceOf(DistributedSystemDisconnectedException.class) + .hasCauseInstanceOf(ForcedDisconnectException.class); }); - vms.forEach(vm -> vm.invoke(() -> { - waitForSilence(); - Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + // Wait for member to get back online and assign all buckets. + server1.invoke(() -> { + cacheRule.createCache(); + initDataStore(regionShortcut); + await().untilAsserted( + () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); + PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME)); + }); - IntStream.range(0, entryCount) - .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i)); - })); + // Assert Region Buckets are consistent. + asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); + accessor.invoke(this::assertRegionBucketsConsistency); } /** - * Asserts that the RegionVersionVectors for both buckets are consistent. - * - * @param bucketId Id of the bucket to compare. - * @param bucketDump1 First bucketDump. - * @param bucketDump2 Second bucketDump. + * The test does the following (clear coordinator is chosen through parameters): + * - Populates the Partition Region. + * - Verifies that the entries are synchronized on all members. + * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a + * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so + * participates on the clear operation). + * - Launches two threads per VM to continuously execute gets, puts and removes for a given time. + * - Clears the Partition Region (at this point the non-coordinator is restarted). + * - Asserts that, after the clear has finished, the Region Buckets are consistent across members. */ - private void assertRegionVersionVectorsConsistency(int bucketId, BucketDump bucketDump1, - BucketDump bucketDump2) { - RegionVersionVector<?> rvv1 = bucketDump1.getRvv(); - RegionVersionVector<?> rvv2 = bucketDump2.getRvv(); + @Test + @Parameters({"SERVER1", "ACCESSOR"}) + @TestCaseName("[{index}] {method}(Coordinator:{0})") + public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced( + TestVM coordinatorVM) throws InterruptedException { + parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT); + final int entries = 7500; + populateRegion(accessor, entries, asList(accessor, server1, server2)); + server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); - if (rvv1 == null) { - assertThat(rvv2) - .as("Bucket " + bucketId + " has an RVV on member " + bucketDump2.getMember() - + ", but does not on member " + bucketDump1.getMember()) - .isNull(); - } + // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30" + final int workMillis = 30000; + List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( + server1.invokeAsync(() -> executeGets(entries, workMillis)), + server1.invokeAsync(() -> executePuts(entries, workMillis)), + accessor.invokeAsync(() -> executeGets(entries, workMillis)), + accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); - if (rvv2 == null) { - assertThat(rvv1) - .as("Bucket " + bucketId + " has an RVV on member " + bucketDump1.getMember() - + ", but does not on member " + bucketDump2.getMember()) - .isNull(); - } + // Retry the clear operation on the region until success (server2 will go down, but other + // members will eventually become primary for those buckets previously hosted by server2). + executeClearWithRetry(getVM(coordinatorVM.vmNumber)); - assertThat(rvv1).isNotNull(); - assertThat(rvv2).isNotNull(); - Map<VersionSource<?>, RegionVersionHolder<?>> rvv2Members = - new HashMap<>(rvv1.getMemberToVersion()); - Map<VersionSource<?>, RegionVersionHolder<?>> rvv1Members = - new HashMap<>(rvv1.getMemberToVersion()); - for (Map.Entry<VersionSource<?>, RegionVersionHolder<?>> entry : rvv1Members.entrySet()) { - VersionSource<?> memberId = entry.getKey(); - RegionVersionHolder<?> versionHolder1 = entry.getValue(); - RegionVersionHolder<?> versionHolder2 = rvv2Members.remove(memberId); - assertThat(versionHolder1) - .as("RegionVersionVector for bucket " + bucketId + " on member " + bucketDump1.getMember() - + " is not consistent with member " + bucketDump2.getMember()) - .isEqualTo(versionHolder2); + // Wait for member to get back online. + server2.invoke(() -> { + cacheRule.createCache(); + initDataStore(RegionShortcut.PARTITION_REDUNDANT); + await().untilAsserted( + () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); + }); + + // Let asyncInvocations finish. + for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { + asyncInvocation.await(); } + + // Assert Region Buckets are consistent. + asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); + accessor.invoke(this::assertRegionBucketsConsistency); } /** - * Asserts that the region data is consistent across buckets. + * The test does the following (clear coordinator is chosen through parameters): + * - Populates the Partition Region. + * - Verifies that the entries are synchronized on all members. + * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a + * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so + * participates on the clear operation). + * - Launches two threads per VM to continuously execute gets, puts and removes for a given time. + * - Clears the Partition Region (at this point the non-coordinator is restarted). + * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary + * buckets on the the restarted members are not available). */ - private void assertRegionBucketsConsistency() throws ForceReattemptException { - PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); - // Redundant copies + 1 primary. - int expectedCopies = region.getRedundantCopies() + 1; + @Test + @Parameters({"SERVER1", "ACCESSOR"}) + @TestCaseName("[{index}] {method}(Coordinator:{0})") + public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced( + TestVM coordinatorVM) throws InterruptedException { + parametrizedSetup(RegionShortcut.PARTITION); + final int entries = 7500; + populateRegion(accessor, entries, asList(accessor, server1, server2)); + server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); - for (int bId = 0; bId < BUCKETS; bId++) { - final int bucketId = bId; - List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId); - assertThat(bucketDumps.size()) - .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has " - + bucketDumps.size()) - .isEqualTo(expectedCopies); + // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30" + final int workMillis = 30000; + List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( + server1.invokeAsync(() -> executeGets(entries, workMillis)), + server1.invokeAsync(() -> executePuts(entries, workMillis)), + accessor.invokeAsync(() -> executeGets(entries, workMillis)), + accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); - // Check that all copies of the bucket have the same data. + // Clear the region. + getVM(coordinatorVM.vmNumber).invoke(() -> { + assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear()) + .isInstanceOf(PartitionedRegionPartialClearException.class); + }); + + // Let asyncInvocations finish. + for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { + asyncInvocation.await(); + } + } + + /** + * The test does the following (clear coordinator is chosen through parameters): + * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a + * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so + * participates on the clear operation). + * - Launches one thread per VM to continuously execute putAll/removeAll for a given time. + * - Clears the Partition Region (at this point the non-coordinator is restarted). + * - Asserts that, after the clear has finished, the Region Buckets are consistent across members. + */ + @Test + @Parameters({"SERVER1", "ACCESSOR"}) + @TestCaseName("[{index}] {method}(Coordinator:{0})") + public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced( + TestVM coordinatorVM) throws InterruptedException { + parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT); + server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); + + // Let all VMs continuously execute putAll/removeAll for 30 seconds. + final int workMillis = 30000; + List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( + server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)), + accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); + + // Retry the clear operation on the region until success (server2 will go down, but other + // members will eventually become primary for those buckets previously hosted by server2). + executeClearWithRetry(getVM(coordinatorVM.vmNumber)); + + // Wait for member to get back online. + server2.invoke(() -> { + cacheRule.createCache(); + initDataStore(RegionShortcut.PARTITION_REDUNDANT); + await().untilAsserted( + () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); + }); + + // Let asyncInvocations finish. + for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { + asyncInvocation.await(); + } + + // Assert Region Buckets are consistent. + asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); + accessor.invoke(this::assertRegionBucketsConsistency); + } + + /** + * The test does the following (clear coordinator is chosen through parameters): + * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a + * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so + * participates on the clear operation). + * - Launches one thread per VM to continuously execute putAll/removeAll for a given time. + * - Clears the Partition Region (at this point the non-coordinator is restarted). + * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary + * buckets on the the restarted members are not available). + */ + @Test + @Parameters({"SERVER1", "ACCESSOR"}) + @TestCaseName("[{index}] {method}(Coordinator:{0})") + public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced( + TestVM coordinatorVM) throws InterruptedException { + parametrizedSetup(RegionShortcut.PARTITION); + server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); + + final int workMillis = 30000; + List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( + server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)), + accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); + + // Clear the region. + getVM(coordinatorVM.vmNumber).invoke(() -> { + assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear()) + .isInstanceOf(PartitionedRegionPartialClearException.class); + }); + + // Let asyncInvocations finish. + for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { + asyncInvocation.await(); + } + } + + private void initAccessor(RegionShortcut regionShortcut) { + PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>() + .setTotalNumBuckets(BUCKETS) + .setLocalMaxMemory(0) + .create(); + + cacheRule.getCache().createRegionFactory(regionShortcut) + .setPartitionAttributes(attrs) + .create(REGION_NAME); + + } + + private void initDataStore(RegionShortcut regionShortcut) { + PartitionAttributes<String, String> attrs = new PartitionAttributesFactory<String, String>() + .setTotalNumBuckets(BUCKETS) + .create(); + + cacheRule.getCache().createRegionFactory(regionShortcut) + .setPartitionAttributes(attrs) + .create(REGION_NAME); + } + + private void parametrizedSetup(RegionShortcut regionShortcut) { + server1.invoke(() -> initDataStore(regionShortcut)); + server2.invoke(() -> initDataStore(regionShortcut)); + accessor.invoke(() -> initAccessor(regionShortcut)); + } + + private void waitForSilence() { + DMStats dmStats = cacheRule.getSystem().getDistributionManager().getStats(); + PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + PartitionedRegionStats partitionedRegionStats = region.getPrStats(); + + await().untilAsserted(() -> { + assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0); + assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0); + }); + } + + /** + * Populates the region and verifies the data on the selected VMs. + */ + private void populateRegion(VM feeder, int entryCount, Iterable<VM> vms) { + feeder.invoke(() -> { + Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + IntStream.range(0, entryCount).forEach(i -> region.put(String.valueOf(i), "Value_" + i)); + }); + + vms.forEach(vm -> vm.invoke(() -> { + waitForSilence(); + Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); + + IntStream.range(0, entryCount) + .forEach(i -> assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i)); + })); + } + + /** + * Asserts that the RegionVersionVectors for both buckets are consistent. + * + * @param bucketId Id of the bucket to compare. + * @param bucketDump1 First bucketDump. + * @param bucketDump2 Second bucketDump. + */ + private void assertRegionVersionVectorsConsistency(int bucketId, BucketDump bucketDump1, + BucketDump bucketDump2) { + RegionVersionVector<?> rvv1 = bucketDump1.getRvv(); + RegionVersionVector<?> rvv2 = bucketDump2.getRvv(); + + if (rvv1 == null) { + assertThat(rvv2) + .as("Bucket " + bucketId + " has an RVV on member " + bucketDump2.getMember() + + ", but does not on member " + bucketDump1.getMember()) + .isNull(); + } + + if (rvv2 == null) { + assertThat(rvv1) + .as("Bucket " + bucketId + " has an RVV on member " + bucketDump1.getMember() + + ", but does not on member " + bucketDump2.getMember()) + .isNull(); + } + + assertThat(rvv1).isNotNull(); + assertThat(rvv2).isNotNull(); + Map<VersionSource<?>, RegionVersionHolder<?>> rvv2Members = + new HashMap<>(rvv1.getMemberToVersion()); + Map<VersionSource<?>, RegionVersionHolder<?>> rvv1Members = + new HashMap<>(rvv1.getMemberToVersion()); + for (Map.Entry<VersionSource<?>, RegionVersionHolder<?>> entry : rvv1Members.entrySet()) { + VersionSource<?> memberId = entry.getKey(); + RegionVersionHolder<?> versionHolder1 = entry.getValue(); + RegionVersionHolder<?> versionHolder2 = rvv2Members.remove(memberId); + assertThat(versionHolder1) + .as("RegionVersionVector for bucket " + bucketId + " on member " + bucketDump1.getMember() + + " is not consistent with member " + bucketDump2.getMember()) + .isEqualTo(versionHolder2); + } + } + + /** + * Asserts that the region data is consistent across buckets. + */ + private void assertRegionBucketsConsistency() throws ForceReattemptException { + PartitionedRegion region = (PartitionedRegion) cacheRule.getCache().getRegion(REGION_NAME); + // Redundant copies + 1 primary. + int expectedCopies = region.getRedundantCopies() + 1; + + for (int bId = 0; bId < BUCKETS; bId++) { + final int bucketId = bId; + List<BucketDump> bucketDumps = region.getAllBucketEntries(bucketId); + assertThat(bucketDumps.size()) + .as("Bucket " + bucketId + " should have " + expectedCopies + " copies, but has " + + bucketDumps.size()) + .isEqualTo(expectedCopies); + + // Check that all copies of the bucket have the same data. if (bucketDumps.size() > 1) { BucketDump firstDump = bucketDumps.get(0); @@ -375,291 +634,6 @@ public class PartitionedRegionClearWithConcurrentOperationsDUnitTest implements } } - /** - * The test does the following (clear coordinator and regionType are parametrized): - * - Launches one thread per VM to continuously execute removes, puts and gets for a given time. - * - Clears the Partition Region continuously every X milliseconds for a given time. - * - Asserts that, after the clears have finished, the Region Buckets are consistent across - * members. - */ - @Test - @TestCaseName(TEST_CASE_NAME) - @Parameters(method = "coordinatorsAndRegionTypes") - public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM coordinatorVM, - RegionShortcut regionShortcut) throws InterruptedException { - parametrizedSetup(regionShortcut); - - // Let all VMs continuously execute puts and gets for 60 seconds. - final int workMillis = 60000; - final int entries = 15000; - List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executePuts(entries, workMillis)), - server2.invokeAsync(() -> executeGets(entries, workMillis)), - accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); - - // Clear the region every second for 60 seconds. - getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 1000)); - - // Let asyncInvocations finish. - for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { - asyncInvocation.await(); - } - - // Assert Region Buckets are consistent. - asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); - accessor.invoke(this::assertRegionBucketsConsistency); - } - - /** - * The test does the following (clear coordinator and regionType are parametrized): - * - Launches two threads per VM to continuously execute putAll and removeAll for a given time. - * - Clears the Partition Region continuously every X milliseconds for a given time. - * - Asserts that, after the clears have finished, the Region Buckets are consistent across - * members. - */ - @Test - @TestCaseName(TEST_CASE_NAME) - @Parameters(method = "coordinatorsAndRegionTypes") - public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM coordinatorVM, - RegionShortcut regionShortcut) throws InterruptedException { - parametrizedSetup(regionShortcut); - - // Let all VMs continuously execute putAll and removeAll for 15 seconds. - final int workMillis = 15000; - List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executePutAlls(0, 2000, workMillis)), - server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workMillis)), - server2.invokeAsync(() -> executePutAlls(2000, 4000, workMillis)), - server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis)), - accessor.invokeAsync(() -> executePutAlls(4000, 6000, workMillis)), - accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, workMillis))); - - // Clear the region every half second for 15 seconds. - getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workMillis, 500)); - - // Let asyncInvocations finish. - for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { - asyncInvocation.await(); - } - - // Assert Region Buckets are consistent. - asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); - accessor.invoke(this::assertRegionBucketsConsistency); - } - - /** - * The test does the following (regionType is parametrized): - * - Populates the Partition Region. - * - Verifies that the entries are synchronized on all members. - * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop the - * coordinator VM while the clear is in progress. - * - Clears the Partition Region (at this point the coordinator is restarted). - * - Asserts that, after the member joins again, the Region Buckets are consistent. - */ - @Test - @TestCaseName("[{index}] {method}(RegionType:{0})") - @Parameters(method = "regionTypes") - public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut regionShortcut) { - parametrizedSetup(regionShortcut); - final int entries = 1000; - populateRegion(accessor, entries, asList(accessor, server1, server2)); - - // Set the CoordinatorMemberKiller and try to clear the region - server1.invoke(() -> { - DistributionMessageObserver.setInstance(new MemberKiller(true)); - Region<String, String> region = cacheRule.getCache().getRegion(REGION_NAME); - assertThatThrownBy(region::clear) - .isInstanceOf(DistributedSystemDisconnectedException.class) - .hasCauseInstanceOf(ForcedDisconnectException.class); - }); - - // Wait for member to get back online and assign all buckets. - server1.invoke(() -> { - cacheRule.createCache(); - initDataStore(regionShortcut); - await().untilAsserted( - () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); - PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME)); - }); - - // Assert Region Buckets are consistent. - asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); - accessor.invoke(this::assertRegionBucketsConsistency); - } - - /** - * The test does the following (clear coordinator is chosen through parameters): - * - Populates the Partition Region. - * - Verifies that the entries are synchronized on all members. - * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a - * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so - * participates on the clear operation). - * - Launches two threads per VM to continuously execute gets, puts and removes for a given time. - * - Clears the Partition Region (at this point the non-coordinator is restarted). - * - Asserts that, after the clear has finished, the Region Buckets are consistent across members. - */ - @Test - @Parameters(method = "coordinators") - @TestCaseName("[{index}] {method}(Coordinator:{0})") - public void clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced( - TestVM coordinatorVM) throws InterruptedException { - parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT); - final int entries = 7500; - populateRegion(accessor, entries, asList(accessor, server1, server2)); - server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); - - // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30" - final int workMillis = 30000; - List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executeGets(entries, workMillis)), - server1.invokeAsync(() -> executePuts(entries, workMillis)), - accessor.invokeAsync(() -> executeGets(entries, workMillis)), - accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); - - // Retry the clear operation on the region until success (server2 will go down, but other - // members will eventually become primary for those buckets previously hosted by server2). - executeClearWithRetry(getVM(coordinatorVM.vmNumber)); - - // Wait for member to get back online. - server2.invoke(() -> { - cacheRule.createCache(); - initDataStore(RegionShortcut.PARTITION_REDUNDANT); - await().untilAsserted( - () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); - }); - - // Let asyncInvocations finish. - for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { - asyncInvocation.await(); - } - - // Assert Region Buckets are consistent. - asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); - accessor.invoke(this::assertRegionBucketsConsistency); - } - - /** - * The test does the following (clear coordinator is chosen through parameters): - * - Populates the Partition Region. - * - Verifies that the entries are synchronized on all members. - * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a - * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so - * participates on the clear operation). - * - Launches two threads per VM to continuously execute gets, puts and removes for a given time. - * - Clears the Partition Region (at this point the non-coordinator is restarted). - * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary - * buckets on the the restarted members are not available). - */ - @Test - @Parameters(method = "coordinators") - @TestCaseName("[{index}] {method}(Coordinator:{0})") - public void clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced( - TestVM coordinatorVM) throws InterruptedException { - parametrizedSetup(RegionShortcut.PARTITION); - final int entries = 7500; - populateRegion(accessor, entries, asList(accessor, server1, server2)); - server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); - - // Let all VMs (except the one to kill) continuously execute gets, put and removes for 30" - final int workMillis = 30000; - List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executeGets(entries, workMillis)), - server1.invokeAsync(() -> executePuts(entries, workMillis)), - accessor.invokeAsync(() -> executeGets(entries, workMillis)), - accessor.invokeAsync(() -> executeRemoves(entries, workMillis))); - - // Clear the region. - getVM(coordinatorVM.vmNumber).invoke(() -> { - assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear()) - .isInstanceOf(PartitionedRegionPartialClearException.class); - }); - - // Let asyncInvocations finish. - for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { - asyncInvocation.await(); - } - } - - /** - * The test does the following (clear coordinator is chosen through parameters): - * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a - * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so - * participates on the clear operation). - * - Launches one thread per VM to continuously execute putAll/removeAll for a given time. - * - Clears the Partition Region (at this point the non-coordinator is restarted). - * - Asserts that, after the clear has finished, the Region Buckets are consistent across members. - */ - @Test - @Parameters(method = "coordinators") - @TestCaseName("[{index}] {method}(Coordinator:{0})") - public void clearOnRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced( - TestVM coordinatorVM) throws InterruptedException { - parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT); - server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); - - // Let all VMs continuously execute putAll/removeAll for 30 seconds. - final int workMillis = 30000; - List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)), - accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); - - // Retry the clear operation on the region until success (server2 will go down, but other - // members will eventually become primary for those buckets previously hosted by server2). - executeClearWithRetry(getVM(coordinatorVM.vmNumber)); - - // Wait for member to get back online. - server2.invoke(() -> { - cacheRule.createCache(); - initDataStore(RegionShortcut.PARTITION_REDUNDANT); - await().untilAsserted( - () -> assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull()); - }); - - // Let asyncInvocations finish. - for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { - asyncInvocation.await(); - } - - // Assert Region Buckets are consistent. - asList(accessor, server1, server2).forEach(vm -> vm.invoke(this::waitForSilence)); - accessor.invoke(this::assertRegionBucketsConsistency); - } - - /** - * The test does the following (clear coordinator is chosen through parameters): - * - Sets the {@link MemberKiller} as a {@link DistributionMessageObserver} to stop a - * non-coordinator VM while the clear is in progress (the member has primary buckets, though, so - * participates on the clear operation). - * - Launches one thread per VM to continuously execute putAll/removeAll for a given time. - * - Clears the Partition Region (at this point the non-coordinator is restarted). - * - Asserts that the clear operation failed with PartitionedRegionPartialClearException (primary - * buckets on the the restarted members are not available). - */ - @Test - @Parameters(method = "coordinators") - @TestCaseName("[{index}] {method}(Coordinator:{0})") - public void clearOnNonRedundantPartitionRegionWithConcurrentPutAllRemoveAllShouldFailWhenNonCoordinatorMembersAreBounced( - TestVM coordinatorVM) throws InterruptedException { - parametrizedSetup(RegionShortcut.PARTITION); - server2.invoke(() -> DistributionMessageObserver.setInstance(new MemberKiller(false))); - - final int workMillis = 30000; - List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList( - server1.invokeAsync(() -> executePutAlls(0, 6000, workMillis)), - accessor.invokeAsync(() -> executeRemoveAlls(2000, 4000, workMillis))); - - // Clear the region. - getVM(coordinatorVM.vmNumber).invoke(() -> { - assertThatThrownBy(() -> cacheRule.getCache().getRegion(REGION_NAME).clear()) - .isInstanceOf(PartitionedRegionPartialClearException.class); - }); - - // Let asyncInvocations finish. - for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) { - asyncInvocation.await(); - } - } - private enum TestVM { ACCESSOR(0), SERVER1(1), SERVER2(2);