[ 
https://issues.apache.org/jira/browse/GEODE-7670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157014#comment-17157014
 ] 

ASF GitHub Bot commented on GEODE-7670:
---------------------------------------

gesterzhou commented on a change in pull request #4848:
URL: https://github.com/apache/geode/pull/4848#discussion_r453976769



##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearWithConcurrentOperationsDUnitTest.java
##########
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.apache.geode.internal.util.ArrayUtils.asList;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.IntStream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import junitparams.naming.TestCaseName;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.PartitionedRegionPartialClearException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.util.CacheWriterAdapter;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.internal.DMStats;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.api.MembershipManagerHelper;
+import org.apache.geode.internal.cache.versions.RegionVersionHolder;
+import org.apache.geode.internal.cache.versions.RegionVersionVector;
+import org.apache.geode.internal.cache.versions.VersionSource;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+
+/**
+ * Tests to verify that {@link PartitionedRegion#clear()} operation can be 
executed multiple times
+ * on the same region while other cache operations are being executed 
concurrently and members are
+ * added or removed.
+ */
+@RunWith(JUnitParamsRunner.class)
+public class PartitionedRegionClearWithConcurrentOperationsDUnitTest 
implements Serializable {
+  private static final Integer BUCKETS = 13;
+  private static final String REGION_NAME = "PartitionedRegion";
+  private static final String TEST_CASE_NAME =
+      "[{index}] {method}(Coordinator:{0}, RegionType:{1})";
+
+  @Rule
+  public DistributedRule distributedRule = new DistributedRule(3);
+
+  @Rule
+  public CacheRule cacheRule = CacheRule.builder().createCacheInAll().build();
+
+  private VM accessor, server1, server2;
+
+  private enum TestVM {
+    ACCESSOR(0), SERVER1(1), SERVER2(2);
+
+    final int vmNumber;
+
+    TestVM(int vmNumber) {
+      this.vmNumber = vmNumber;
+    }
+  }
+
+  @SuppressWarnings("unused")
+  static RegionShortcut[] regionTypes() {
+    return new RegionShortcut[] {
+        RegionShortcut.PARTITION, RegionShortcut.PARTITION_REDUNDANT
+    };
+  }
+
+  @SuppressWarnings("unused")
+  static TestVM[] coordinators() {
+    return new TestVM[] {
+        TestVM.SERVER1, TestVM.ACCESSOR
+    };
+  }
+
+  @SuppressWarnings("unused")
+  static Object[] coordinatorsAndRegionTypes() {
+    ArrayList<Object[]> parameters = new ArrayList<>();
+    RegionShortcut[] regionShortcuts = regionTypes();
+
+    Arrays.stream(regionShortcuts).forEach(regionShortcut -> {
+      parameters.add(new Object[] {TestVM.SERVER1, regionShortcut});
+      parameters.add(new Object[] {TestVM.ACCESSOR, regionShortcut});
+    });
+
+    return parameters.toArray();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    server1 = getVM(TestVM.SERVER1.vmNumber);
+    server2 = getVM(TestVM.SERVER2.vmNumber);
+    accessor = getVM(TestVM.ACCESSOR.vmNumber);
+  }
+
+  private void initAccessor(RegionShortcut regionShortcut) {
+    @SuppressWarnings("rawtypes")
+    PartitionAttributes attributes = new PartitionAttributesFactory<String, 
String>()
+        .setTotalNumBuckets(BUCKETS)
+        .setLocalMaxMemory(0)
+        .create();
+
+    cacheRule.getCache().createRegionFactory(regionShortcut)
+        .setPartitionAttributes(attributes)
+        .create(REGION_NAME);
+
+  }
+
+  private void initDataStore(RegionShortcut regionShortcut) {
+    @SuppressWarnings("rawtypes")
+    PartitionAttributes attributes = new PartitionAttributesFactory<String, 
String>()
+        .setTotalNumBuckets(BUCKETS)
+        .create();
+
+    cacheRule.getCache().createRegionFactory(regionShortcut)
+        .setPartitionAttributes(attributes)
+        .create(REGION_NAME);
+  }
+
+  private void parametrizedSetup(RegionShortcut regionShortcut) {
+    server1.invoke(() -> initDataStore(regionShortcut));
+    server2.invoke(() -> initDataStore(regionShortcut));
+    accessor.invoke(() -> initAccessor(regionShortcut));
+  }
+
+  private void waitForSilence() {
+    DMStats dmStats = 
cacheRule.getSystem().getDistributionManager().getStats();
+    PartitionedRegion region = (PartitionedRegion) 
cacheRule.getCache().getRegion(REGION_NAME);
+    PartitionedRegionStats partitionedRegionStats = region.getPrStats();
+
+    await().untilAsserted(() -> {
+      assertThat(dmStats.getReplyWaitsInProgress()).isEqualTo(0);
+      
assertThat(partitionedRegionStats.getVolunteeringInProgress()).isEqualTo(0);
+      
assertThat(partitionedRegionStats.getBucketCreatesInProgress()).isEqualTo(0);
+      
assertThat(partitionedRegionStats.getPrimaryTransfersInProgress()).isEqualTo(0);
+      
assertThat(partitionedRegionStats.getRebalanceBucketCreatesInProgress()).isEqualTo(0);
+      
assertThat(partitionedRegionStats.getRebalancePrimaryTransfersInProgress()).isEqualTo(0);
+    });
+  }
+
+  /**
+   * Populates the region and verifies the data on the selected VMs.
+   */
+  private void populateRegion(VM feeder, int entryCount, List<VM> vms) {
+    feeder.invoke(() -> {
+      Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+      IntStream.range(0, entryCount).forEach(i -> 
region.put(String.valueOf(i), "Value_" + i));
+    });
+
+    vms.forEach(vm -> vm.invoke(() -> {
+      waitForSilence();
+      Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+
+      IntStream.range(0, entryCount)
+          .forEach(i -> 
assertThat(region.get(String.valueOf(i))).isEqualTo("Value_" + i));
+    }));
+  }
+
+  /**
+   * Asserts that the RegionVersionVectors for both buckets are consistent.
+   *
+   * @param bucketId Id of the bucket to compare.
+   * @param bucketDump1 First bucketDump.
+   * @param bucketDump2 Second bucketDump.
+   */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private void assertRegionVersionVectorsConsistency(int bucketId, BucketDump 
bucketDump1,
+      BucketDump bucketDump2) {
+    RegionVersionVector rvv1 = bucketDump1.getRvv();
+    RegionVersionVector rvv2 = bucketDump2.getRvv();
+
+    if (rvv1 == null) {
+      assertThat(rvv2)
+          .as("Bucket " + bucketId + " has an RVV on member " + 
bucketDump2.getMember()
+              + ", but does not on member " + bucketDump1.getMember())
+          .isNull();
+    }
+
+    if (rvv2 == null) {
+      assertThat(rvv1)
+          .as("Bucket " + bucketId + " has an RVV on member " + 
bucketDump1.getMember()
+              + ", but does not on member " + bucketDump2.getMember())
+          .isNull();
+    }
+
+    assertThat(rvv1).isNotNull();
+    assertThat(rvv2).isNotNull();
+    Map<VersionSource, RegionVersionHolder> rvv2Members =
+        new HashMap<VersionSource, 
RegionVersionHolder>(rvv1.getMemberToVersion());
+    Map<VersionSource, RegionVersionHolder> rvv1Members =
+        new HashMap<VersionSource, 
RegionVersionHolder>(rvv1.getMemberToVersion());
+    for (Map.Entry<VersionSource, RegionVersionHolder> entry : 
rvv1Members.entrySet()) {
+      VersionSource memberId = entry.getKey();
+      RegionVersionHolder versionHolder1 = entry.getValue();
+      RegionVersionHolder versionHolder2 = rvv2Members.remove(memberId);
+      assertThat(versionHolder1)
+          .as("RegionVersionVector for bucket " + bucketId + " on member " + 
bucketDump1.getMember()
+              + " is not consistent with member " + bucketDump2.getMember())
+          .isEqualTo(versionHolder2);
+    }
+  }
+
+  /**
+   * Asserts that the region data is consistent across buckets.
+   */
+  private void assertRegionBucketsConsistency() throws ForceReattemptException 
{
+    List<BucketDump> bucketDumps;
+    PartitionedRegion region = (PartitionedRegion) 
cacheRule.getCache().getRegion(REGION_NAME);
+    // Redundant copies + 1 primary.
+    int expectedCopies = region.getRedundantCopies() + 1;
+
+    for (int bId = 0; bId < BUCKETS; bId++) {
+      final int bucketId = bId;
+      bucketDumps = region.getAllBucketEntries(bucketId);
+      assertThat(bucketDumps.size())
+          .as("Bucket " + bucketId + " should have " + expectedCopies + " 
copies, but has "
+              + bucketDumps.size())
+          .isEqualTo(expectedCopies);
+
+      // Check that all copies of the bucket have the same data.
+      if (bucketDumps.size() > 1) {
+        BucketDump firstDump = bucketDumps.get(0);
+
+        for (int j = 1; j < bucketDumps.size(); j++) {
+          BucketDump otherDump = bucketDumps.get(j);
+          assertRegionVersionVectorsConsistency(bucketId, firstDump, 
otherDump);
+
+          await().untilAsserted(() -> assertThat(otherDump.getValues())
+              .as("Values for bucket " + bucketId + " on member " + 
otherDump.getMember()
+                  + " are not consistent with member " + firstDump.getMember())
+              .isEqualTo(firstDump.getValues()));
+
+          await().untilAsserted(() -> assertThat(otherDump.getVersions())
+              .as("Versions for bucket " + bucketId + " on member " + 
otherDump.getMember()
+                  + " are not consistent with member " + firstDump.getMember())
+              .isEqualTo(firstDump.getVersions()));
+        }
+      }
+    }
+  }
+
+  /**
+   * Continuously execute get operations on the PartitionedRegion for the 
given durationInSeconds.
+   */
+  private void executeGets(final int numEntries, final long durationInSeconds) 
{
+    Cache cache = cacheRule.getCache();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    Instant finishTime = Instant.now().plusSeconds(durationInSeconds);
+
+    while (Instant.now().isBefore(finishTime)) {
+      // Region might have been cleared in between, that's why we check for 
null.
+      IntStream.range(0, numEntries).forEach(i -> {
+        Optional<String> nullableValue = 
Optional.ofNullable(region.get(String.valueOf(i)));
+        nullableValue.ifPresent(value -> assertThat(value).isEqualTo("Value_" 
+ i));
+      });
+    }
+  }
+
+  /**
+   * Continuously execute put operations on the PartitionedRegion for the 
given durationInSeconds.
+   */
+  private void executePuts(final int numEntries, final long durationInSeconds) 
{
+    Cache cache = cacheRule.getCache();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    Instant finishTime = Instant.now().plusSeconds(durationInSeconds);
+
+    while (Instant.now().isBefore(finishTime)) {
+      IntStream.range(0, numEntries).forEach(i -> 
region.put(String.valueOf(i), "Value_" + i));
+    }
+  }
+
+  /**
+   * Continuously execute putAll operations on the PartitionedRegion for the 
given
+   * durationInSeconds.
+   */
+  private void executePutAlls(final int start, final int finish, final long 
durationInSeconds) {
+    Cache cache = cacheRule.getCache();
+    Map<String, String> valuesToInsert = new HashMap<>();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    IntStream.range(start, finish)
+        .forEach(i -> valuesToInsert.put(String.valueOf(i), "Value_" + i));
+    Instant finishTime = Instant.now().plusSeconds(durationInSeconds);
+
+    while (Instant.now().isBefore(finishTime)) {
+      region.putAll(valuesToInsert);
+    }
+  }
+
+  /**
+   * Continuously execute remove operations on the PartitionedRegion for the 
given
+   * durationInSeconds.
+   */
+  private void executeRemoves(final int numEntries, final long 
durationInSeconds) {
+    Cache cache = cacheRule.getCache();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    Instant finishTime = Instant.now().plusSeconds(durationInSeconds);
+
+    while (Instant.now().isBefore(finishTime)) {
+      // Region might have been cleared in between, that's why we check for 
null.
+      IntStream.range(0, numEntries).forEach(i -> {
+        Optional<String> nullableValue = 
Optional.ofNullable(region.remove(String.valueOf(i)));
+        nullableValue.ifPresent(value -> assertThat(value).isEqualTo("Value_" 
+ i));
+      });
+    }
+  }
+
+  /**
+   * Continuously execute removeAll operations on the PartitionedRegion for 
the given
+   * durationInSeconds.
+   */
+  private void executeRemoveAlls(final int start, final int finish, final long 
durationInSeconds) {
+    Cache cache = cacheRule.getCache();
+    List<String> keysToRemove = new ArrayList<>();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    IntStream.range(start, finish).forEach(i -> 
keysToRemove.add(String.valueOf(i)));
+    Instant finishTime = Instant.now().plusSeconds(durationInSeconds);
+
+    while (Instant.now().isBefore(finishTime)) {
+      region.removeAll(keysToRemove);
+    }
+  }
+
+  /**
+   * Execute the clear operation and retry until success.
+   */
+  private void executeClearWithRetry(VM coordinator) {
+    coordinator.invoke(() -> {
+      boolean retry;
+
+      do {
+        retry = false;
+
+        try {
+          cacheRule.getCache().getRegion(REGION_NAME).clear();
+        } catch (PartitionedRegionPartialClearException pce) {
+          retry = true;
+        }
+      } while (retry);
+    });
+  }
+
+  /**
+   * Continuously execute clear operations on the PartitionedRegion for the 
given durationInSeconds.
+   */
+  private void executeClears(final long durationInSeconds, final long 
waitTimeInMilliseconds)
+      throws InterruptedException {
+    Cache cache = cacheRule.getCache();
+    Region<String, String> region = cache.getRegion(REGION_NAME);
+    Instant finishTime = Instant.now().plusSeconds(durationInSeconds);
+
+    while (Instant.now().isBefore(finishTime)) {
+      region.clear();
+      Thread.sleep(waitTimeInMilliseconds);
+    }
+  }
+
+  /**
+   * Register the MemberKiller CacheWriter on the given vms and cancel 
auto-reconnects.
+   */
+  private void registerVMKillerAsCacheWriter(List<VM> vmsToBounce) {
+    vmsToBounce.forEach(vm -> vm.invoke(() -> {
+      Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+      region.getAttributesMutator().setCacheWriter(new MemberKiller());
+    }));
+  }
+
+  /**
+   * The test does the following (clear coordinator and regionType are 
parametrized):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Launches one thread per VM to continuously execute gets, puts and 
removes for a given time.
+   * - Clears the Partition Region continuously every X milliseconds for a 
given time.
+   * - Asserts that, after the clears have finished, the Region Buckets are 
consistent across
+   * members.
+   */
+  @Test
+  @TestCaseName(TEST_CASE_NAME)
+  @Parameters(method = "coordinatorsAndRegionTypes")
+  public void clearWithConcurrentPutGetRemoveShouldWorkCorrectly(TestVM 
coordinatorVM,
+      RegionShortcut regionShortcut) throws InterruptedException {
+    final int entries = 15000;
+    final int workSeconds = 60;
+    parametrizedSetup(regionShortcut);
+
+    // Let all VMs continuously execute puts and gets for 60 seconds.
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePuts(entries, workSeconds)),
+        server2.invokeAsync(() -> executeGets(entries, workSeconds)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workSeconds)));
+
+    // Clear the region every second for 60 seconds.
+    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workSeconds, 
1000));
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> 
vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (clear coordinator and regionType are 
parametrized):
+   * - Launches two threads per VM to continuously execute putAll and 
removeAll for a given time.
+   * - Clears the Partition Region continuously every X milliseconds for a 
given time.
+   * - Asserts that, after the clears have finished, the Region Buckets are 
consistent across
+   * members.
+   */
+  @Test
+  @TestCaseName(TEST_CASE_NAME)
+  @Parameters(method = "coordinatorsAndRegionTypes")
+  public void clearWithConcurrentPutAllRemoveAllShouldWorkCorrectly(TestVM 
coordinatorVM,
+      RegionShortcut regionShortcut) throws InterruptedException {
+    final int workSeconds = 15;
+    parametrizedSetup(regionShortcut);
+
+    // Let all VMs continuously execute putAll for 15 seconds.
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executePutAlls(0, 2000, workSeconds)),
+        server1.invokeAsync(() -> executeRemoveAlls(0, 2000, workSeconds)),
+        server2.invokeAsync(() -> executePutAlls(2000, 4000, workSeconds)),
+        server2.invokeAsync(() -> executeRemoveAlls(2000, 4000, workSeconds)),
+        accessor.invokeAsync(() -> executePutAlls(4000, 6000, workSeconds)),
+        accessor.invokeAsync(() -> executeRemoveAlls(4000, 6000, 
workSeconds)));
+
+    // Clear the region every half second for 15 seconds.
+    getVM(coordinatorVM.vmNumber).invoke(() -> executeClears(workSeconds, 
500));
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> 
vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (regionType is parametrized):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link CacheWriter} to stop the 
coordinator VM while the
+   * clear is in progress.
+   * - Clears the Partition Region (at this point the coordinator is 
restarted).
+   * - Asserts that, after the member joins again, the Region Buckets are 
consistent.
+   */
+  @Test
+  @TestCaseName("[{index}] {method}(RegionType:{0})")
+  @Parameters(method = "regionTypes")
+  public void clearShouldFailWhenCoordinatorMemberIsBounced(RegionShortcut 
regionShortcut) {
+    final int entries = 1000;
+    parametrizedSetup(regionShortcut);
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+    registerVMKillerAsCacheWriter(Collections.singletonList(server1));
+
+    // Clear the region.
+    server1.invoke(() -> {
+      Region<String, String> region = 
cacheRule.getCache().getRegion(REGION_NAME);
+      assertThatThrownBy(region::clear)
+          .isInstanceOf(DistributedSystemDisconnectedException.class)
+          .hasCauseInstanceOf(ForcedDisconnectException.class);
+    });
+
+    // Wait for member to get back online and assign all buckets.
+    server1.invoke(() -> {
+      cacheRule.createCache();
+      initDataStore(regionShortcut);
+      await().untilAsserted(
+          () -> 
assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+      
PartitionRegionHelper.assignBucketsToPartitions(cacheRule.getCache().getRegion(REGION_NAME));
+    });
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> 
vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (clear coordinator is chosen through 
parameters):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link CacheWriter} to stop a 
non-coordinator VM while the
+   * clear is in progress (the member has primary buckets, though, so 
participates on
+   * the clear operation).
+   * - Launches two threads per VM to continuously execute gets, puts and 
removes for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is 
restarted).
+   * - Asserts that, after the clear has finished, the Region Buckets are 
consistent across members.
+   */
+  @Test
+  @Parameters(method = "coordinators")
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void 
clearOnRedundantPartitionRegionWithConcurrentPutGetRemoveShouldWorkCorrectlyWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    final int entries = 7500;
+    final int workSeconds = 60;
+    parametrizedSetup(RegionShortcut.PARTITION_REDUNDANT);
+    registerVMKillerAsCacheWriter(Collections.singletonList(server2));
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+
+    // Let all VMs (except the one to kill) continuously execute gets, put and 
removes for 60
+    // seconds.
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executeGets(entries, workSeconds)),
+        server1.invokeAsync(() -> executePuts(entries, workSeconds)),
+        accessor.invokeAsync(() -> executeGets(entries, workSeconds)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workSeconds)));
+
+    // Retry the clear operation on the region until success (server2 will go 
down, but other
+    // members will become primary for those buckets previously hosted by 
server2).
+    executeClearWithRetry(getVM(coordinatorVM.vmNumber));
+
+    // Wait for member to get back online.
+    server2.invoke(() -> {
+      cacheRule.createCache();
+      initDataStore(RegionShortcut.PARTITION_REDUNDANT);
+      await().untilAsserted(
+          () -> 
assertThat(InternalDistributedSystem.getConnectedInstance()).isNotNull());
+    });
+
+    // Let asyncInvocations finish.
+    for (AsyncInvocation<Void> asyncInvocation : asyncInvocationList) {
+      asyncInvocation.await();
+    }
+
+    // Assert Region Buckets are consistent.
+    asList(accessor, server1, server2).forEach(vm -> 
vm.invoke(this::waitForSilence));
+    accessor.invoke(this::assertRegionBucketsConsistency);
+  }
+
+  /**
+   * The test does the following (clear coordinator is chosen through 
parameters):
+   * - Populates the Partition Region.
+   * - Verifies that the entries are synchronized on all members.
+   * - Sets the {@link MemberKiller} as a {@link CacheWriter} to stop a 
non-coordinator VM while the
+   * clear is in progress (the member has primary buckets, though, so 
participates on
+   * the clear operation).
+   * - Launches two threads per VM to continuously execute gets, puts and 
removes for a given time.
+   * - Clears the Partition Region (at this point the non-coordinator is 
restarted).
+   * - Asserts that the clear operation failed with 
PartitionedRegionPartialClearException (primary
+   * buckets on the the restarted members are not available).
+   */
+  @Test
+  @Parameters(method = "coordinators")
+  @TestCaseName("[{index}] {method}(Coordinator:{0})")
+  public void 
clearOnNonRedundantPartitionRegionWithConcurrentPutGetRemoveShouldFailWhenNonCoordinatorMembersAreBounced(
+      TestVM coordinatorVM) throws InterruptedException {
+    final int entries = 7500;
+    final int workSeconds = 45;
+    parametrizedSetup(RegionShortcut.PARTITION);
+    registerVMKillerAsCacheWriter(Collections.singletonList(server2));
+    populateRegion(accessor, entries, asList(accessor, server1, server2));
+
+    // Let all VMs (except the one to kill) continuously execute gets, put and 
removes for 45
+    // seconds.
+    List<AsyncInvocation<Void>> asyncInvocationList = Arrays.asList(
+        server1.invokeAsync(() -> executeGets(entries, workSeconds)),
+        server1.invokeAsync(() -> executePuts(entries, workSeconds)),
+        accessor.invokeAsync(() -> executeGets(entries, workSeconds)),
+        accessor.invokeAsync(() -> executeRemoves(entries, workSeconds)));
+
+    // Clear the region.
+    getVM(coordinatorVM.vmNumber).invoke(() -> {
+      assertThatThrownBy(() -> 
cacheRule.getCache().getRegion(REGION_NAME).clear())

Review comment:
       The PartialClearException could happen here but not 100% will happen. 
Your assertion could fail here. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Partitioned Region clear operations can occur during concurrent data 
> operations
> -------------------------------------------------------------------------------
>
>                 Key: GEODE-7670
>                 URL: https://issues.apache.org/jira/browse/GEODE-7670
>             Project: Geode
>          Issue Type: Sub-task
>          Components: regions
>            Reporter: Nabarun Nag
>            Assignee: Juan Ramos
>            Priority: Major
>              Labels: GeodeCommons
>          Time Spent: 5.5h
>  Remaining Estimate: 0h
>
> Clear operations are successful when concurrent read/write operations occur. 
> Ensure there are test coverage for this use case and modify the code needed 
> to enable this.
> Acceptance :
>  * Passing DUnit tests where clear operations are successful on partitioned 
> region with 
>  * concurrent puts (writes) and clear op
>  * concurrent gets (reads) and clear op
>  * Test coverage to when a member departs in this scenario
>  * Test coverage to when a member restarts in this scenario
>  * Unit tests with complete code coverage for the newly written code.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to