This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-7682-2
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 1573e3de4ad4a2788127bacf612766685af35395
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Fri Feb 28 23:24:45 2020 -0800

    GEODE-7682: add clear API for PR
---
 .../cache/PartitionedRegionClearDUnitTest.java     | 133 +++++++++++++
 ...itionedRegionSingleNodeOperationsJUnitTest.java |   7 +-
 .../codeAnalysis/sanctionedDataSerializables.txt   |   6 +-
 .../org/apache/geode/internal/DSFIDFactory.java    |   3 +
 .../geode/internal/cache/DistributedRegion.java    |   9 -
 .../apache/geode/internal/cache/LocalRegion.java   |  10 +
 .../geode/internal/cache/PartitionedRegion.java    | 208 +++++++++++++++++++--
 .../geode/internal/cache/RegionEventImpl.java      |   5 +
 .../internal/cache/partitioned/ClearPRMessage.java |  76 +++-----
 .../cache/partitioned/ClearPRMessageTest.java      |  60 ++----
 10 files changed, 396 insertions(+), 121 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
new file mode 100644
index 0000000..fd1a10b
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.stream.IntStream;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+
+public class PartitionedRegionClearDUnitTest implements Serializable {
+  protected static final String REGION_NAME = "testPR";
+  protected static final int NUM_ENTRIES = 1000;
+
+  protected int locatorPort;
+  protected MemberVM locator;
+  protected MemberVM dataStore1, dataStore2, accessor;
+  protected ClientVM client1, client2;
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule(6);
+
+  @Before
+  public void setUp() throws Exception {
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    dataStore1 = cluster.startServerVM(1, locatorPort);
+    dataStore2 = cluster.startServerVM(2, locatorPort);
+    accessor = cluster.startServerVM(3, locatorPort);
+    client1 = cluster.startClientVM(4, c -> 
c.withLocatorConnection((locatorPort)));
+    client2 = cluster.startClientVM(5, c -> 
c.withLocatorConnection((locatorPort)));
+
+    prepare();
+  }
+
+  private void prepare() {
+    dataStore1.invoke(this::initDataStore);
+    dataStore2.invoke(this::initDataStore);
+    accessor.invoke(this::initAccessor);
+
+    accessor.invoke(this::insertEntries);
+    dataStore1.invoke(this::verifyDataIsLoaded);
+    dataStore2.invoke(this::verifyDataIsLoaded);
+  }
+
+  private void verifyDataIsLoaded() {
+    Region region = getCache().getRegion(REGION_NAME);
+    assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+  }
+
+  public Cache getCache() {
+    Cache cache = ClusterStartupRule.getCache();
+    assertThat(cache).isNotNull();
+
+    return cache;
+  }
+
+  private void initDataStore() {
+    Cache cache = getCache();
+    cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
+        .setPartitionAttributes(new 
PartitionAttributesFactory().setTotalNumBuckets(10).create())
+        .create(REGION_NAME);
+  }
+
+  private void initAccessor() {
+    Cache cache = getCache();
+    cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT)
+        .setPartitionAttributes(
+            new 
PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create())
+        .create(REGION_NAME);
+  }
+
+  private void insertEntries() {
+    Cache cache = getCache();
+    Region region = cache.getRegion(REGION_NAME);
+    IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i));
+    assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+  }
+
+  @Test
+  public void normalClearFromDataStore() {
+    dataStore1.invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+
+      region.clear();
+      assertThat(region.size()).isEqualTo(0);
+    });
+    dataStore2.invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      assertThat(region.size()).isEqualTo(0);
+    });
+  }
+
+  @Test
+  public void normalClearFromAccessor() {
+    accessor.invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      assertThat(region.size()).isEqualTo(NUM_ENTRIES);
+      region.clear();
+
+      assertThat(region.size()).isEqualTo(0);
+    });
+    dataStore2.invoke(() -> {
+      Region region = getCache().getRegion(REGION_NAME);
+      assertThat(region.size()).isEqualTo(0);
+    });
+  }
+}
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
index e8de2b5..efda321 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/PartitionedRegionSingleNodeOperationsJUnitTest.java
@@ -1309,12 +1309,7 @@ public class 
PartitionedRegionSingleNodeOperationsJUnitTest {
       pr.put(new Integer(3), "three");
       pr.getEntry("key");
 
-      try {
-        pr.clear();
-        fail(
-            "PartitionedRegionSingleNodeOperationTest:testUnSupportedOps() 
operation failed on a blank PartitionedRegion");
-      } catch (UnsupportedOperationException expected) {
-      }
+      pr.clear();
 
       // try {
       // pr.entries(true);
diff --git 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index d6806f2..cacc46b 100644
--- 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1436,8 +1436,8 @@ fromData,27
 toData,27
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage,2
-fromData,30
-toData,44
+fromData,19
+toData,36
 
 org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2
 fromData,17
@@ -2101,4 +2101,4 @@ toData,105
 
 org/apache/geode/pdx/internal/PdxType,2
 fromData,109
-toData,124
\ No newline at end of file
+toData,124
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java 
b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 81ee359..36d4bd5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -284,6 +284,7 @@ import 
org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe;
 import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage;
 import org.apache.geode.internal.cache.partitioned.BucketSizeMessage;
 import 
org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import 
org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage;
 import org.apache.geode.internal.cache.partitioned.CreateBucketMessage;
@@ -972,6 +973,8 @@ public class DSFIDFactory implements 
DataSerializableFixedID {
     serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY,
         
GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class);
     serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class);
+    serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class);
+    serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, 
ClearPRMessage.ClearReplyMessage.class);
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index 1a62919..900d85e 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -189,10 +189,6 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
   @MutableForTesting
   public static boolean ignoreReconnect = false;
 
-  /**
-   * Lock to prevent multiple threads on this member from performing a clear 
at the same time.
-   */
-  private final Object clearLock = new Object();
   private final ReentrantReadWriteLock failedInitialImageLock = new 
ReentrantReadWriteLock(true);
 
   @MakeNotStatic
@@ -927,11 +923,6 @@ public class DistributedRegion extends LocalRegion 
implements InternalDistribute
     }
   }
 
-  private void lockCheckReadiness() {
-    cache.getCancelCriterion().checkCancelInProgress(null);
-    checkReadiness();
-  }
-
   @Override
   Object validatedDestroy(Object key, EntryEventImpl event)
       throws TimeoutException, EntryNotFoundException, CacheWriterException {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 6a7e2d2..d5f9156 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -473,6 +473,11 @@ public class LocalRegion extends AbstractRegion implements 
LoaderHelperFactory,
   private final Lock clientMetaDataLock = new ReentrantLock();
 
   /**
+   * Lock to prevent multiple threads on this member from performing a clear 
at the same time.
+   */
+  protected final Object clearLock = new Object();
+
+  /**
    * Lock for updating the cache service profile for the region.
    */
   private final Lock cacheServiceProfileUpdateLock = new ReentrantLock();
@@ -2750,6 +2755,11 @@ public class LocalRegion extends AbstractRegion 
implements LoaderHelperFactory,
     checkRegionDestroyed(true);
   }
 
+  protected void lockCheckReadiness() {
+    cache.getCancelCriterion().checkCancelInProgress(null);
+    checkReadiness();
+  }
+
   /**
    * This method should be called when the caller cannot locate an entry and 
that condition is
    * unexpected. This will first double check the cache and region state 
before throwing an
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 2c1ec04..85310df 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -179,6 +179,7 @@ import 
org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
 import 
org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender;
 import org.apache.geode.internal.cache.ha.ThreadIdentifier;
+import org.apache.geode.internal.cache.partitioned.ClearPRMessage;
 import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage;
 import 
org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse;
 import org.apache.geode.internal.cache.partitioned.DestroyMessage;
@@ -2144,18 +2145,197 @@ public class PartitionedRegion extends LocalRegion
     throw new UnsupportedOperationException();
   }
 
-  /**
-   * @since GemFire 5.0
-   * @throws UnsupportedOperationException OVERRIDES
-   */
   @Override
-  public void clear() {
-    throw new UnsupportedOperationException();
+  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
+    synchronized (clearLock) {
+      final DistributedLockService lockService = 
getPartitionedRegionLockService();
+      try {
+        lockService.lock("_clearOperation", -1, -1);
+      } catch (IllegalStateException e) {
+        lockCheckReadiness();
+      }
+      try {
+        if (cache.isCacheAtShutdownAll()) {
+          throw cache.getCacheClosedException("Cache is shutting down");
+        }
+
+        // create ClearPRMessage per bucket
+        ArrayList<ClearPRMessage> clearMsgList =
+            (ArrayList<ClearPRMessage>) createClearPRMessages();
+        for (ClearPRMessage clearPRMessage : clearMsgList) {
+          int bucketId = clearPRMessage.getBucketId();
+          checkReadiness();
+          long then = 0;
+          try {
+            sendClearMsgByBucket(bucketId, clearPRMessage);
+          } catch (PartitionOfflineException poe) {
+            // TODO add a PartialResultException
+            logger.info("PR.sendClearMsgByBucket encountered 
PartitionOfflineException at bucket "
+                + bucketId, poe);
+          } catch (Exception e) {
+            logger.info("PR.sendClearMsgByBucket encountered exception at 
bucket " + bucketId, e);
+          }
+
+          if (logger.isDebugEnabled()) {
+            long now = System.currentTimeMillis();
+            if (now - then > 10000) {
+              logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", 
bucketId,
+                  (now - then));
+            }
+          }
+          // TODO add psStats
+        }
+      } finally {
+        try {
+          lockService.unlock("_clearOperation");
+        } catch (IllegalStateException e) {
+          lockCheckReadiness();
+        }
+      }
+
+      // notify bridge clients at PR level
+      notifyBridgeClients(regionEvent);
+    }
   }
 
-  @Override
-  void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) {
-    throw new UnsupportedOperationException();
+  void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage 
clearPRMessage) {
+    RetryTimeKeeper retryTime = null;
+    InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, 
null);
+    if (logger.isDebugEnabled()) {
+      logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", 
bucketId,
+          currentTarget);
+    }
+
+    long timeOut = 0;
+    int count = 0;
+    for (;;) {
+      switch (count) {
+        case 0:
+          // Note we don't check for DM cancellation in common case.
+          // First time. Assume success, keep going.
+          break;
+        case 1:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // Second time (first failure). Calculate timeout and keep going.
+          timeOut = System.currentTimeMillis() + this.retryTimeout;
+          break;
+        default:
+          this.cache.getCancelCriterion().checkCancelInProgress(null);
+          // test for timeout
+          long timeLeft = timeOut - System.currentTimeMillis();
+          if (timeLeft < 0) {
+            PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" 
+ bucketId,
+                this.retryTimeout);
+            // NOTREACHED
+          }
+
+          // Didn't time out. Sleep a bit and then continue
+          boolean interrupted = Thread.interrupted();
+          try {
+            
Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION);
+          } catch (InterruptedException ignore) {
+            interrupted = true;
+          } finally {
+            if (interrupted) {
+              Thread.currentThread().interrupt();
+            }
+          }
+          break;
+      } // switch
+      count++;
+
+      if (currentTarget == null) { // pick target
+        checkReadiness();
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+
+        currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, 
false);
+        if (currentTarget == null) {
+          // the bucket does not exist, no need to clear
+          logger.info("Bucket " + bucketId + " does not contain data, no need 
to clear");
+          return;
+        } else {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", 
currentTarget);
+          }
+        }
+
+        // It's possible this is a GemFire thread e.g. ServerConnection
+        // which got to this point because of a distributed system shutdown or
+        // region closure which uses interrupt to break any sleep() or wait() 
calls
+        // e.g. waitForPrimary or waitForBucketRecovery in which case throw 
exception
+        checkShutdown();
+        continue;
+      } // pick target
+
+      boolean result = false;
+      try {
+        final boolean isLocal = (this.localMaxMemory > 0) && 
currentTarget.equals(getMyId());
+        if (isLocal) {
+          result = clearPRMessage.doLocalClear(this, bucketId);
+        } else {
+          ClearPRMessage.ClearResponse response = 
clearPRMessage.send(currentTarget, this);
+          if (response != null) {
+            this.prStats.incPartitionMessagesSent();
+            result = response.waitForResult();
+          }
+        }
+        if (result) {
+          return;
+        }
+      } catch (ForceReattemptException prce) {
+        checkReadiness();
+        InternalDistributedMember lastTarget = currentTarget;
+        if (retryTime == null) {
+          retryTime = new RetryTimeKeeper(this.retryTimeout);
+        }
+        currentTarget = getNodeForBucketWrite(bucketId, retryTime);
+        if (logger.isDebugEnabled()) {
+          logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying {}", 
lastTarget,
+              currentTarget);
+        }
+        if (lastTarget.equals(currentTarget)) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("PR.sendClearMsgByBucket: Retrying at the same 
node:{} due to {}",
+                currentTarget, prce.getMessage());
+          }
+          if (retryTime.overMaximum()) {
+            PRHARedundancyProvider.timedOut(this, null, null, "update an 
entry",
+                this.retryTimeout);
+            // NOTREACHED
+          }
+          retryTime.waitToRetryNode();
+        }
+      }
+
+      // It's possible this is a GemFire thread e.g. ServerConnection
+      // which got to this point because of a distributed system shutdown or
+      // region closure which uses interrupt to break any sleep() or wait()
+      // calls
+      // e.g. waitForPrimary or waitForBucketRecovery in which case throw
+      // exception
+      checkShutdown();
+
+      // If we get here, the attempt failed...
+      if (count == 1) {
+        // TODO prStats add ClearPRMsg retried
+        this.prStats.incPutAllMsgsRetried();
+      }
+    }
+  }
+
+  List createClearPRMessages() {
+    if (cache.isCacheAtShutdownAll()) {
+      throw cache.getCacheClosedException("Cache is shutting down");
+    }
+
+    ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>();
+    for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) {
+      ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId);
+      clearMsgList.add(clearPRMessage);
+    }
+    return clearMsgList;
   }
 
   @Override
@@ -2574,7 +2754,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, 
bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, 
bucketId, true);
           if (isDebugEnabled) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new 
currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2715,7 +2895,7 @@ public class PartitionedRegion extends LocalRegion
             retryTime = new RetryTimeKeeper(this.retryTimeout);
           }
 
-          currentTarget = waitForNodeOrCreateBucket(retryTime, event, 
bucketId);
+          currentTarget = waitForNodeOrCreateBucket(retryTime, event, 
bucketId, true);
           if (logger.isDebugEnabled()) {
             logger.debug("PR.sendMsgByBucket: event size is {}, new 
currentTarget is {}",
                 getEntrySize(event), currentTarget);
@@ -2960,7 +3140,7 @@ public class PartitionedRegion extends LocalRegion
         if (retryTime == null) {
           retryTime = new RetryTimeKeeper(this.retryTimeout);
         }
-        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId);
+        currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, 
true);
 
         // It's possible this is a GemFire thread e.g. ServerConnection
         // which got to this point because of a distributed system shutdown or
@@ -3122,7 +3302,7 @@ public class PartitionedRegion extends LocalRegion
    * @return a Node which contains the bucket, potentially null
    */
   private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper 
retryTime,
-      EntryEventImpl event, Integer bucketId) {
+      EntryEventImpl event, Integer bucketId, boolean createIfNotExist) {
     InternalDistributedMember newNode;
     if (retryTime.overMaximum()) {
       PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket",
@@ -3132,7 +3312,7 @@ public class PartitionedRegion extends LocalRegion
 
     retryTime.waitForBucketsRecovery();
     newNode = getNodeForBucketWrite(bucketId, retryTime);
-    if (newNode == null) {
+    if (newNode == null && createIfNotExist) {
       newNode = createBucket(bucketId, getEntrySize(event), retryTime);
     }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
index 402b7f2..f155a7e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java
@@ -119,6 +119,11 @@ public class RegionEventImpl
     return region;
   }
 
+  public void setRegion(LocalRegion region) {
+    this.region = region;
+    this.distributedMember = region.getMyId();
+  }
+
   @Override
   public Operation getOperation() {
     return this.op;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
index 1a8aba1..15b281b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java
@@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.CacheException;
+import org.apache.geode.cache.Operation;
 import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
@@ -54,11 +55,11 @@ import 
org.apache.geode.logging.internal.log4j.api.LogService;
 public class ClearPRMessage extends PartitionMessageWithDirectReply {
   private static final Logger logger = LogService.getLogger();
 
-  private RegionEventImpl regionEvent;
-
   private Integer bucketId;
 
-  /** The time in ms to wait for a lock to be obtained during doLocalClear() */
+  /**
+   * The time in ms to wait for a lock to be obtained during doLocalClear()
+   */
   public static final int LOCK_WAIT_TIMEOUT_MS = 1000;
   public static final String BUCKET_NON_PRIMARY_MESSAGE =
       "The bucket region on target member is no longer primary";
@@ -85,10 +86,6 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
     this.posDup = false;
   }
 
-  public void setRegionEvent(RegionEventImpl event) {
-    regionEvent = event;
-  }
-
   public void initMessage(PartitionedRegion region, 
Set<InternalDistributedMember> recipients,
       DirectReplyProcessor replyProcessor) {
     this.resetRecipients();
@@ -109,14 +106,9 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
     return true;
   }
 
-  public RegionEventImpl getRegionEvent() {
-    return regionEvent;
-  }
-
   public ClearResponse send(DistributedMember recipient, PartitionedRegion 
region)
       throws ForceReattemptException {
-    Set<InternalDistributedMember> recipients =
-        Collections.singleton((InternalDistributedMember) recipient);
+    Set recipients = Collections.singleton(recipient);
     ClearResponse clearResponse = new ClearResponse(region.getSystem(), 
recipients);
     initMessage(region, recipients, clearResponse);
     if (logger.isDebugEnabled()) {
@@ -143,7 +135,6 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
     } else {
       InternalDataSerializer.writeSignedVL(bucketId, out);
     }
-    DataSerializer.writeObject(regionEvent, out);
   }
 
   @Override
@@ -151,12 +142,11 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
     this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
-    this.regionEvent = DataSerializer.readObject(in);
   }
 
   @Override
   public EventID getEventID() {
-    return regionEvent.getEventId();
+    return null;
   }
 
   /**
@@ -169,7 +159,7 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
   protected boolean operateOnPartitionedRegion(ClusterDistributionManager 
distributionManager,
       PartitionedRegion region, long startTime) {
     try {
-      result = doLocalClear(region);
+      result = doLocalClear(region, getBucketId());
     } catch (ForceReattemptException ex) {
       sendReply(getSender(), getProcessorId(), distributionManager, new 
ReplyException(ex), region,
           startTime);
@@ -179,39 +169,31 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
     return false;
   }
 
-  public boolean doLocalClear(PartitionedRegion region) throws 
ForceReattemptException {
+  public int getBucketId() {
+    return this.bucketId;
+  }
+
+  public boolean doLocalClear(PartitionedRegion region, int bucketId)
+      throws ForceReattemptException {
     // Retrieve local bucket region which matches target bucketId
-    BucketRegion bucketRegion = 
region.getDataStore().getInitializedBucketForId(null, bucketId);
+    BucketRegion bucketRegion =
+        region.getDataStore().getInitializedBucketForId(null, this.bucketId);
 
+    boolean lockedForPrimary = bucketRegion.doLockForPrimary(false);
     // Check if we are primary, throw exception if not
-    if (!bucketRegion.isPrimary()) {
+    if (!lockedForPrimary) {
       throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
     }
-
-    DistributedLockService lockService = getPartitionRegionLockService();
-    String lockName = bucketRegion.getFullPath();
     try {
-      boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1);
-
-      if (!locked) {
-        throw new 
ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
-      }
-
-      // Double check if we are still primary, as this could have changed 
between our first check
-      // and obtaining the lock
-      if (!bucketRegion.isPrimary()) {
-        throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE);
-      }
-
-      try {
-        bucketRegion.cmnClearRegion(regionEvent, true, true);
-      } catch (Exception ex) {
-        throw new ForceReattemptException(
-            EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), 
ex);
-      }
-
+      RegionEventImpl regionEvent = new RegionEventImpl();
+      regionEvent.setOperation(Operation.REGION_CLEAR);
+      regionEvent.setRegion(bucketRegion);
+      bucketRegion.cmnClearRegion(regionEvent, true, true);
+    } catch (Exception ex) {
+      throw new ForceReattemptException(
+          EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), 
ex);
     } finally {
-      lockService.unlock(lockName);
+      bucketRegion.doUnlockForPrimary();
     }
 
     return true;
@@ -277,7 +259,9 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
   }
 
   public static class ClearReplyMessage extends ReplyMessage {
-    /** Result of the Clear operation */
+    /**
+     * Result of the Clear operation
+     */
     boolean result;
 
     @Override
@@ -298,7 +282,9 @@ public class ClearPRMessage extends 
PartitionMessageWithDirectReply {
       setException(ex);
     }
 
-    /** Send an ack */
+    /**
+     * Send an ack
+     */
     public static void send(InternalDistributedMember recipient, int 
processorId,
         ReplySender replySender,
         boolean result, ReplyException ex) {
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
index 2cf5231..3568ff0 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java
@@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doNothing;
@@ -38,7 +37,6 @@ import java.util.Set;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.geode.distributed.DistributedLockService;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionManager;
@@ -50,6 +48,7 @@ import 
org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.PartitionedRegionDataStore;
 import org.apache.geode.internal.cache.PartitionedRegionStats;
+import org.apache.geode.internal.cache.RegionEventImpl;
 
 public class ClearPRMessageTest {
 
@@ -61,64 +60,43 @@ public class ClearPRMessageTest {
   @Before
   public void setup() throws ForceReattemptException {
     message = spy(new ClearPRMessage());
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
     region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS);
     dataStore = mock(PartitionedRegionDataStore.class);
     when(region.getDataStore()).thenReturn(dataStore);
+    when(region.getFullPath()).thenReturn("/test");
     bucketRegion = mock(BucketRegion.class);
     when(dataStore.getInitializedBucketForId(any(), 
any())).thenReturn(bucketRegion);
+    RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class);
+    // RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null, false, 
member, true);
   }
 
   @Test
   public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() {
     when(bucketRegion.isPrimary()).thenReturn(false);
 
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 0))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
   }
 
   @Test
   public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() {
-    DistributedLockService mockLockService = 
mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(false);
 
-    when(mockLockService.lock(anyString(), anyLong(), 
anyLong())).thenReturn(false);
-    when(bucketRegion.isPrimary()).thenReturn(true);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
-        .isInstanceOf(ForceReattemptException.class)
-        
.hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE);
-  }
-
-  @Test
-  public void 
doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() {
-    DistributedLockService mockLockService = 
mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
-
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 0))
         .isInstanceOf(ForceReattemptException.class)
         .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
-    // Confirm that we actually obtained and released the lock
-    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
-    verify(mockLockService, times(1)).unlock(any());
   }
 
   @Test
   public void 
doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation()
 {
-    DistributedLockService mockLockService = 
mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
     NullPointerException exception = new NullPointerException("Error 
encountered");
     doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), 
anyBoolean());
 
-    // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
 
-    assertThatThrownBy(() -> message.doLocalClear(region))
+    assertThatThrownBy(() -> message.doLocalClear(region, 
bucketRegion.getId()))
         .isInstanceOf(ForceReattemptException.class)
         
.hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION);
 
@@ -129,21 +107,13 @@ public class ClearPRMessageTest {
   @Test
   public void 
doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained()
       throws ForceReattemptException {
-    DistributedLockService mockLockService = 
mock(DistributedLockService.class);
-    doReturn(mockLockService).when(message).getPartitionRegionLockService();
-
 
     // Be primary on the first check, then be not primary on the second check
-    when(bucketRegion.isPrimary()).thenReturn(true);
-    when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true);
-    assertThat(message.doLocalClear(region)).isTrue();
+    when(bucketRegion.doLockForPrimary(false)).thenReturn(true);
+    assertThat(message.doLocalClear(region, 0)).isTrue();
 
     // Confirm that cmnClearRegion was called
     verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), 
anyBoolean());
-
-    // Confirm that we actually obtained and released the lock
-    verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong());
-    verify(mockLockService, times(1)).unlock(any());
   }
 
   @Test
@@ -197,7 +167,8 @@ public class ClearPRMessageTest {
     int processorId = 1000;
     int startTime = 0;
 
-    doReturn(true).when(message).doLocalClear(region);
+    doReturn(0).when(message).getBucketId();
+    doReturn(true).when(message).doLocalClear(region, 0);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();
 
@@ -222,7 +193,8 @@ public class ClearPRMessageTest {
     ForceReattemptException exception =
         new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE);
 
-    doThrow(exception).when(message).doLocalClear(region);
+    doReturn(0).when(message).getBucketId();
+    doThrow(exception).when(message).doLocalClear(region, 0);
     doReturn(sender).when(message).getSender();
     doReturn(processorId).when(message).getProcessorId();
 

Reply via email to