This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-7682 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 658afb35d4ac4f04db8e69f59762112f746e1751 Author: zhouxh <gz...@pivotal.io> AuthorDate: Fri Feb 14 17:44:47 2020 -0800 GEODE-7682: add clear API for PR --- .../cache/PartitionedRegionClearDUnitTest.java | 133 +++++++++++++ .../codeAnalysis/sanctionedDataSerializables.txt | 6 +- .../org/apache/geode/internal/DSFIDFactory.java | 3 + .../geode/internal/cache/DistributedRegion.java | 9 - .../apache/geode/internal/cache/LocalRegion.java | 10 + .../geode/internal/cache/PartitionedRegion.java | 207 +++++++++++++++++++-- .../geode/internal/cache/RegionEventImpl.java | 5 + .../internal/cache/partitioned/ClearPRMessage.java | 76 +++----- .../cache/partitioned/ClearPRMessageTest.java | 60 ++---- 9 files changed, 394 insertions(+), 115 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java new file mode 100644 index 0000000..fd1a10b --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionClearDUnitTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Serializable; +import java.util.stream.IntStream; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.PartitionAttributesFactory; +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + + +public class PartitionedRegionClearDUnitTest implements Serializable { + protected static final String REGION_NAME = "testPR"; + protected static final int NUM_ENTRIES = 1000; + + protected int locatorPort; + protected MemberVM locator; + protected MemberVM dataStore1, dataStore2, accessor; + protected ClientVM client1, client2; + + @Rule + public ClusterStartupRule cluster = new ClusterStartupRule(6); + + @Before + public void setUp() throws Exception { + locator = cluster.startLocatorVM(0); + locatorPort = locator.getPort(); + dataStore1 = cluster.startServerVM(1, locatorPort); + dataStore2 = cluster.startServerVM(2, locatorPort); + accessor = cluster.startServerVM(3, locatorPort); + client1 = cluster.startClientVM(4, c -> c.withLocatorConnection((locatorPort))); + client2 = cluster.startClientVM(5, c -> c.withLocatorConnection((locatorPort))); + + prepare(); + } + + private void prepare() { + dataStore1.invoke(this::initDataStore); + dataStore2.invoke(this::initDataStore); + accessor.invoke(this::initAccessor); + + accessor.invoke(this::insertEntries); + dataStore1.invoke(this::verifyDataIsLoaded); + dataStore2.invoke(this::verifyDataIsLoaded); + } + + private void verifyDataIsLoaded() { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(NUM_ENTRIES); + } + + public Cache getCache() { + Cache cache = ClusterStartupRule.getCache(); + assertThat(cache).isNotNull(); + + return cache; + } + + private void initDataStore() { + Cache cache = getCache(); + cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT) + .setPartitionAttributes(new PartitionAttributesFactory().setTotalNumBuckets(10).create()) + .create(REGION_NAME); + } + + private void initAccessor() { + Cache cache = getCache(); + cache.createRegionFactory(RegionShortcut.PARTITION_REDUNDANT) + .setPartitionAttributes( + new PartitionAttributesFactory().setTotalNumBuckets(10).setLocalMaxMemory(0).create()) + .create(REGION_NAME); + } + + private void insertEntries() { + Cache cache = getCache(); + Region region = cache.getRegion(REGION_NAME); + IntStream.range(0, NUM_ENTRIES).forEach(i -> region.put(i, "value" + i)); + assertThat(region.size()).isEqualTo(NUM_ENTRIES); + } + + @Test + public void normalClearFromDataStore() { + dataStore1.invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(NUM_ENTRIES); + + region.clear(); + assertThat(region.size()).isEqualTo(0); + }); + dataStore2.invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(0); + }); + } + + @Test + public void normalClearFromAccessor() { + accessor.invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(NUM_ENTRIES); + region.clear(); + + assertThat(region.size()).isEqualTo(0); + }); + dataStore2.invoke(() -> { + Region region = getCache().getRegion(REGION_NAME); + assertThat(region.size()).isEqualTo(0); + }); + } +} diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index c7af2f0..316eda7 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1436,8 +1436,8 @@ fromData,27 toData,27 org/apache/geode/internal/cache/partitioned/ClearPRMessage,2 -fromData,30 -toData,44 +fromData,19 +toData,36 org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2 fromData,17 @@ -2101,4 +2101,4 @@ toData,105 org/apache/geode/pdx/internal/PdxType,2 fromData,109 -toData,124 \ No newline at end of file +toData,124 diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index 81ee359..36d4bd5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -284,6 +284,7 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe; import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage; import org.apache.geode.internal.cache.partitioned.BucketSizeMessage; import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage; +import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage; import org.apache.geode.internal.cache.partitioned.CreateBucketMessage; @@ -972,6 +973,8 @@ public class DSFIDFactory implements DataSerializableFixedID { serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY, GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class); serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class); + serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class); + serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 1a62919..900d85e 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -189,10 +189,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute @MutableForTesting public static boolean ignoreReconnect = false; - /** - * Lock to prevent multiple threads on this member from performing a clear at the same time. - */ - private final Object clearLock = new Object(); private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true); @MakeNotStatic @@ -927,11 +923,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute } } - private void lockCheckReadiness() { - cache.getCancelCriterion().checkCancelInProgress(null); - checkReadiness(); - } - @Override Object validatedDestroy(Object key, EntryEventImpl event) throws TimeoutException, EntryNotFoundException, CacheWriterException { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 6a7e2d2..d5f9156 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -473,6 +473,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, private final Lock clientMetaDataLock = new ReentrantLock(); /** + * Lock to prevent multiple threads on this member from performing a clear at the same time. + */ + protected final Object clearLock = new Object(); + + /** * Lock for updating the cache service profile for the region. */ private final Lock cacheServiceProfileUpdateLock = new ReentrantLock(); @@ -2750,6 +2755,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, checkRegionDestroyed(true); } + protected void lockCheckReadiness() { + cache.getCancelCriterion().checkCancelInProgress(null); + checkReadiness(); + } + /** * This method should be called when the caller cannot locate an entry and that condition is * unexpected. This will first double check the cache and region state before throwing an diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 2c1ec04..86fb18d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -179,6 +179,7 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse; import org.apache.geode.internal.cache.partitioned.DestroyMessage; @@ -2144,18 +2145,196 @@ public class PartitionedRegion extends LocalRegion throw new UnsupportedOperationException(); } - /** - * @since GemFire 5.0 - * @throws UnsupportedOperationException OVERRIDES - */ @Override - public void clear() { - throw new UnsupportedOperationException(); + void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { + synchronized (clearLock) { + final DistributedLockService lockService = getPartitionedRegionLockService(); + try { + lockService.lock("_clearOperation", -1, -1); + } catch (IllegalStateException e) { + lockCheckReadiness(); + } + try { + if (cache.isCacheAtShutdownAll()) { + throw cache.getCacheClosedException("Cache is shutting down"); + } + + // create ClearPRMessage per bucket + ArrayList<ClearPRMessage> clearMsgList = (ArrayList) createClearPRMessages(); + for (ClearPRMessage clearPRMessage : clearMsgList) { + int bucketId = clearPRMessage.getBucketId(); + checkReadiness(); + long then = 0; + try { + sendClearMsgByBucket(bucketId, clearPRMessage); + } catch (PartitionOfflineException poe) { + // TODO add a PartialResultException + logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket " + + bucketId, poe); + } catch (Exception e) { + logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e); + } + + if (logger.isDebugEnabled()) { + long now = System.currentTimeMillis(); + if (now - then > 10000) { + logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId, + (now - then)); + } + } + // TODO add psStats + } + } finally { + try { + lockService.unlock("_clearOperation"); + } catch (IllegalStateException e) { + lockCheckReadiness(); + } + } + + // notify bridge clients at PR level + notifyBridgeClients(regionEvent); + } } - @Override - void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { - throw new UnsupportedOperationException(); + void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) { + RetryTimeKeeper retryTime = null; + InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null); + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId, + currentTarget); + } + + long timeOut = 0; + int count = 0; + for (;;) { + switch (count) { + case 0: + // Note we don't check for DM cancellation in common case. + // First time. Assume success, keep going. + break; + case 1: + this.cache.getCancelCriterion().checkCancelInProgress(null); + // Second time (first failure). Calculate timeout and keep going. + timeOut = System.currentTimeMillis() + this.retryTimeout; + break; + default: + this.cache.getCancelCriterion().checkCancelInProgress(null); + // test for timeout + long timeLeft = timeOut - System.currentTimeMillis(); + if (timeLeft < 0) { + PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId, + this.retryTimeout); + // NOTREACHED + } + + // Didn't time out. Sleep a bit and then continue + boolean interrupted = Thread.interrupted(); + try { + Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); + } catch (InterruptedException ignore) { + interrupted = true; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + break; + } // switch + count++; + + if (currentTarget == null) { // pick target + checkReadiness(); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + + currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false); + if (currentTarget == null) { + // the bucket does not exist, no need to clear + logger.info("Bucket " + bucketId + " does not contain data, no need to clear"); + return; + } else { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget); + } + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception + checkShutdown(); + continue; + } // pick target + + boolean result = false; + try { + final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId()); + if (isLocal) { + result = clearPRMessage.doLocalClear(this, bucketId); + } else { + ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this); + if (response != null) { + this.prStats.incPartitionMessagesSent(); + result = response.waitForResult(); + } + } + if (result) { + return; + } + } catch (ForceReattemptException prce) { + checkReadiness(); + InternalDistributedMember lastTarget = currentTarget; + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); + if (logger.isDebugEnabled()) { + logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying {}", lastTarget, + currentTarget); + } + if (lastTarget.equals(currentTarget)) { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}", + currentTarget, prce.getMessage()); + } + if (retryTime.overMaximum()) { + PRHARedundancyProvider.timedOut(this, null, null, "update an entry", + this.retryTimeout); + // NOTREACHED + } + retryTime.waitToRetryNode(); + } + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() + // calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw + // exception + checkShutdown(); + + // If we get here, the attempt failed... + if (count == 1) { + // TODO prStats add ClearPRMsg retried + this.prStats.incPutAllMsgsRetried(); + } + } + } + + List createClearPRMessages() { + if (cache.isCacheAtShutdownAll()) { + throw cache.getCacheClosedException("Cache is shutting down"); + } + + ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>(); + for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) { + ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId); + clearMsgList.add(clearPRMessage); + } + return clearMsgList; } @Override @@ -2574,7 +2753,7 @@ public class PartitionedRegion extends LocalRegion retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); if (isDebugEnabled) { logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}", getEntrySize(event), currentTarget); @@ -2715,7 +2894,7 @@ public class PartitionedRegion extends LocalRegion retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); if (logger.isDebugEnabled()) { logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}", getEntrySize(event), currentTarget); @@ -2960,7 +3139,7 @@ public class PartitionedRegion extends LocalRegion if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); // It's possible this is a GemFire thread e.g. ServerConnection // which got to this point because of a distributed system shutdown or @@ -3122,7 +3301,7 @@ public class PartitionedRegion extends LocalRegion * @return a Node which contains the bucket, potentially null */ private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper retryTime, - EntryEventImpl event, Integer bucketId) { + EntryEventImpl event, Integer bucketId, boolean createIfNotExist) { InternalDistributedMember newNode; if (retryTime.overMaximum()) { PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket", @@ -3132,7 +3311,7 @@ public class PartitionedRegion extends LocalRegion retryTime.waitForBucketsRecovery(); newNode = getNodeForBucketWrite(bucketId, retryTime); - if (newNode == null) { + if (newNode == null && createIfNotExist) { newNode = createBucket(bucketId, getEntrySize(event), retryTime); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java index 402b7f2..f155a7e 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/RegionEventImpl.java @@ -119,6 +119,11 @@ public class RegionEventImpl return region; } + public void setRegion(LocalRegion region) { + this.region = region; + this.distributedMember = region.getMyId(); + } + @Override public Operation getOperation() { return this.op; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java index 1a8aba1..15b281b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.DataSerializer; import org.apache.geode.annotations.VisibleForTesting; import org.apache.geode.cache.CacheException; +import org.apache.geode.cache.Operation; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterDistributionManager; @@ -54,11 +55,11 @@ import org.apache.geode.logging.internal.log4j.api.LogService; public class ClearPRMessage extends PartitionMessageWithDirectReply { private static final Logger logger = LogService.getLogger(); - private RegionEventImpl regionEvent; - private Integer bucketId; - /** The time in ms to wait for a lock to be obtained during doLocalClear() */ + /** + * The time in ms to wait for a lock to be obtained during doLocalClear() + */ public static final int LOCK_WAIT_TIMEOUT_MS = 1000; public static final String BUCKET_NON_PRIMARY_MESSAGE = "The bucket region on target member is no longer primary"; @@ -85,10 +86,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { this.posDup = false; } - public void setRegionEvent(RegionEventImpl event) { - regionEvent = event; - } - public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients, DirectReplyProcessor replyProcessor) { this.resetRecipients(); @@ -109,14 +106,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { return true; } - public RegionEventImpl getRegionEvent() { - return regionEvent; - } - public ClearResponse send(DistributedMember recipient, PartitionedRegion region) throws ForceReattemptException { - Set<InternalDistributedMember> recipients = - Collections.singleton((InternalDistributedMember) recipient); + Set recipients = Collections.singleton(recipient); ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients); initMessage(region, recipients, clearResponse); if (logger.isDebugEnabled()) { @@ -143,7 +135,6 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { } else { InternalDataSerializer.writeSignedVL(bucketId, out); } - DataSerializer.writeObject(regionEvent, out); } @Override @@ -151,12 +142,11 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { throws IOException, ClassNotFoundException { super.fromData(in, context); this.bucketId = (int) InternalDataSerializer.readSignedVL(in); - this.regionEvent = DataSerializer.readObject(in); } @Override public EventID getEventID() { - return regionEvent.getEventId(); + return null; } /** @@ -169,7 +159,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager, PartitionedRegion region, long startTime) { try { - result = doLocalClear(region); + result = doLocalClear(region, getBucketId()); } catch (ForceReattemptException ex) { sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region, startTime); @@ -179,39 +169,31 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { return false; } - public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException { + public int getBucketId() { + return this.bucketId; + } + + public boolean doLocalClear(PartitionedRegion region, int bucketId) + throws ForceReattemptException { // Retrieve local bucket region which matches target bucketId - BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId); + BucketRegion bucketRegion = + region.getDataStore().getInitializedBucketForId(null, this.bucketId); + boolean lockedForPrimary = bucketRegion.doLockForPrimary(false); // Check if we are primary, throw exception if not - if (!bucketRegion.isPrimary()) { + if (!lockedForPrimary) { throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); } - - DistributedLockService lockService = getPartitionRegionLockService(); - String lockName = bucketRegion.getFullPath(); try { - boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1); - - if (!locked) { - throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); - } - - // Double check if we are still primary, as this could have changed between our first check - // and obtaining the lock - if (!bucketRegion.isPrimary()) { - throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); - } - - try { - bucketRegion.cmnClearRegion(regionEvent, true, true); - } catch (Exception ex) { - throw new ForceReattemptException( - EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex); - } - + RegionEventImpl regionEvent = new RegionEventImpl(); + regionEvent.setOperation(Operation.REGION_CLEAR); + regionEvent.setRegion(bucketRegion); + bucketRegion.cmnClearRegion(regionEvent, true, true); + } catch (Exception ex) { + throw new ForceReattemptException( + EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex); } finally { - lockService.unlock(lockName); + bucketRegion.doUnlockForPrimary(); } return true; @@ -277,7 +259,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { } public static class ClearReplyMessage extends ReplyMessage { - /** Result of the Clear operation */ + /** + * Result of the Clear operation + */ boolean result; @Override @@ -298,7 +282,9 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { setException(ex); } - /** Send an ack */ + /** + * Send an ack + */ public static void send(InternalDistributedMember recipient, int processorId, ReplySender replySender, boolean result, ReplyException ex) { diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java index 2cf5231..3568ff0 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java @@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doNothing; @@ -38,7 +37,6 @@ import java.util.Set; import org.junit.Before; import org.junit.Test; -import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionManager; @@ -50,6 +48,7 @@ import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PartitionedRegionDataStore; import org.apache.geode.internal.cache.PartitionedRegionStats; +import org.apache.geode.internal.cache.RegionEventImpl; public class ClearPRMessageTest { @@ -61,64 +60,43 @@ public class ClearPRMessageTest { @Before public void setup() throws ForceReattemptException { message = spy(new ClearPRMessage()); + InternalDistributedMember member = mock(InternalDistributedMember.class); region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS); dataStore = mock(PartitionedRegionDataStore.class); when(region.getDataStore()).thenReturn(dataStore); + when(region.getFullPath()).thenReturn("/test"); bucketRegion = mock(BucketRegion.class); when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion); + RegionEventImpl bucketRegionEventImpl = mock(RegionEventImpl.class); + // RegionEventImpl(bucketRegion, Operation.REGION_CLEAR, null, false, member, true); } @Test public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() { when(bucketRegion.isPrimary()).thenReturn(false); - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, 0)) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); } @Test public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); + when(bucketRegion.doLockForPrimary(false)).thenReturn(false); - when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false); - when(bucketRegion.isPrimary()).thenReturn(true); - - assertThatThrownBy(() -> message.doLocalClear(region)) - .isInstanceOf(ForceReattemptException.class) - .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); - } - - @Test - public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); - - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); - - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, 0)) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - // Confirm that we actually obtained and released the lock - verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong()); - verify(mockLockService, times(1)).unlock(any()); } @Test public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); NullPointerException exception = new NullPointerException("Error encountered"); doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenReturn(true); - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, bucketRegion.getId())) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION); @@ -129,21 +107,13 @@ public class ClearPRMessageTest { @Test public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained() throws ForceReattemptException { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); - assertThat(message.doLocalClear(region)).isTrue(); + when(bucketRegion.doLockForPrimary(false)).thenReturn(true); + assertThat(message.doLocalClear(region, 0)).isTrue(); // Confirm that cmnClearRegion was called verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - - // Confirm that we actually obtained and released the lock - verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong()); - verify(mockLockService, times(1)).unlock(any()); } @Test @@ -197,7 +167,8 @@ public class ClearPRMessageTest { int processorId = 1000; int startTime = 0; - doReturn(true).when(message).doLocalClear(region); + doReturn(0).when(message).getBucketId(); + doReturn(true).when(message).doLocalClear(region, 0); doReturn(sender).when(message).getSender(); doReturn(processorId).when(message).getProcessorId(); @@ -222,7 +193,8 @@ public class ClearPRMessageTest { ForceReattemptException exception = new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - doThrow(exception).when(message).doLocalClear(region); + doReturn(0).when(message).getBucketId(); + doThrow(exception).when(message).doLocalClear(region, 0); doReturn(sender).when(message).getSender(); doReturn(processorId).when(message).getProcessorId();