This is an automated email from the ASF dual-hosted git repository. zhouxj pushed a commit to branch feature/GEODE-7682 in repository https://gitbox.apache.org/repos/asf/geode.git
commit 7e46a971e2fac9ef8217ec35bbfdb84f45693ba6 Author: zhouxh <gz...@pivotal.io> AuthorDate: Fri Feb 14 17:44:47 2020 -0800 GEODE-7682: add clear API for PR --- .../org/apache/geode/internal/DSFIDFactory.java | 3 + .../geode/internal/cache/DistributedRegion.java | 9 - .../apache/geode/internal/cache/LocalRegion.java | 10 + .../geode/internal/cache/PartitionedRegion.java | 204 +++++++++++++++++++-- .../internal/cache/partitioned/ClearPRMessage.java | 43 ++--- .../cache/partitioned/ClearPRMessageTest.java | 55 ++---- 6 files changed, 230 insertions(+), 94 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java index 81ee359..36d4bd5 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java @@ -284,6 +284,7 @@ import org.apache.geode.internal.cache.partitioned.BucketCountLoadProbe; import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage; import org.apache.geode.internal.cache.partitioned.BucketSizeMessage; import org.apache.geode.internal.cache.partitioned.BucketSizeMessage.BucketSizeReplyMessage; +import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueReplyMessage; import org.apache.geode.internal.cache.partitioned.CreateBucketMessage; @@ -972,6 +973,8 @@ public class DSFIDFactory implements DataSerializableFixedID { serializer.registerDSFID(GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY, GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationEntry.class); serializer.registerDSFID(ABORT_BACKUP_REQUEST, AbortBackupRequest.class); + serializer.registerDSFID(PR_CLEAR_MESSAGE, ClearPRMessage.class); + serializer.registerDSFID(PR_CLEAR_REPLY_MESSAGE, ClearPRMessage.ClearReplyMessage.class); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index 1a62919..900d85e 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -189,10 +189,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute @MutableForTesting public static boolean ignoreReconnect = false; - /** - * Lock to prevent multiple threads on this member from performing a clear at the same time. - */ - private final Object clearLock = new Object(); private final ReentrantReadWriteLock failedInitialImageLock = new ReentrantReadWriteLock(true); @MakeNotStatic @@ -927,11 +923,6 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute } } - private void lockCheckReadiness() { - cache.getCancelCriterion().checkCancelInProgress(null); - checkReadiness(); - } - @Override Object validatedDestroy(Object key, EntryEventImpl event) throws TimeoutException, EntryNotFoundException, CacheWriterException { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 6a7e2d2..d5f9156 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -473,6 +473,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, private final Lock clientMetaDataLock = new ReentrantLock(); /** + * Lock to prevent multiple threads on this member from performing a clear at the same time. + */ + protected final Object clearLock = new Object(); + + /** * Lock for updating the cache service profile for the region. */ private final Lock cacheServiceProfileUpdateLock = new ReentrantLock(); @@ -2750,6 +2755,11 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, checkRegionDestroyed(true); } + protected void lockCheckReadiness() { + cache.getCancelCriterion().checkCancelInProgress(null); + checkReadiness(); + } + /** * This method should be called when the caller cannot locate an entry and that condition is * unexpected. This will first double check the cache and region state before throwing an diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 2c1ec04..2f30cf8 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -179,6 +179,7 @@ import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultWa import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; import org.apache.geode.internal.cache.execute.ServerToClientFunctionResultSender; import org.apache.geode.internal.cache.ha.ThreadIdentifier; +import org.apache.geode.internal.cache.partitioned.ClearPRMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage; import org.apache.geode.internal.cache.partitioned.ContainsKeyValueMessage.ContainsKeyValueResponse; import org.apache.geode.internal.cache.partitioned.DestroyMessage; @@ -2144,18 +2145,193 @@ public class PartitionedRegion extends LocalRegion throw new UnsupportedOperationException(); } - /** - * @since GemFire 5.0 - * @throws UnsupportedOperationException OVERRIDES - */ @Override - public void clear() { - throw new UnsupportedOperationException(); + void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { + synchronized (clearLock) { + final DistributedLockService lockService = getPartitionedRegionLockService(); + try { + lockService.lock("_clearOperation", -1, -1); + } catch (IllegalStateException e) { + lockCheckReadiness(); + } + try { + if (cache.isCacheAtShutdownAll()) { + throw cache.getCacheClosedException("Cache is shutting down"); + } + + // create ClearPRMessage per bucket + ArrayList<ClearPRMessage> clearMsgList = (ArrayList) createClearPRMessages(); + for (ClearPRMessage clearPRMessage : clearMsgList) { + int bucketId = clearPRMessage.getBucketId(); + checkReadiness(); + long then = 0; + try { + sendClearMsgByBucket(bucketId, clearPRMessage); + } catch (PartitionOfflineException poe) { + // TODO add a PartialResultException + logger.info("PR.sendClearMsgByBucket encountered PartitionOfflineException at bucket " + + bucketId, poe); + } catch (Exception e) { + logger.info("PR.sendClearMsgByBucket encountered exception at bucket " + bucketId, e); + } + + if (logger.isDebugEnabled()) { + long now = System.currentTimeMillis(); + if (now - then > 10000) { + logger.debug("PR.sendClearMsgByBucket for bucket {} took {} ms", bucketId, + (now - then)); + } + } + // TODO add psStats + } + } finally { + try { + lockService.unlock("_clearOperation"); + } catch (IllegalStateException e) { + lockCheckReadiness(); + } + } + + // notify bridge clients at PR level + notifyBridgeClients(regionEvent); + } } - @Override - void basicClear(RegionEventImpl regionEvent, boolean cacheWrite) { - throw new UnsupportedOperationException(); + void sendClearMsgByBucket(final Integer bucketId, ClearPRMessage clearPRMessage) { + RetryTimeKeeper retryTime = null; + InternalDistributedMember currentTarget = getNodeForBucketWrite(bucketId, null); + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket:bucket {}'s currentTarget is {}", bucketId, + currentTarget); + } + + long timeOut = 0; + int count = 0; + for (;;) { + switch (count) { + case 0: + // Note we don't check for DM cancellation in common case. + // First time. Assume success, keep going. + break; + case 1: + this.cache.getCancelCriterion().checkCancelInProgress(null); + // Second time (first failure). Calculate timeout and keep going. + timeOut = System.currentTimeMillis() + this.retryTimeout; + break; + default: + this.cache.getCancelCriterion().checkCancelInProgress(null); + // test for timeout + long timeLeft = timeOut - System.currentTimeMillis(); + if (timeLeft < 0) { + PRHARedundancyProvider.timedOut(this, null, null, "clear a bucket" + bucketId, + this.retryTimeout); + // NOTREACHED + } + + // Didn't time out. Sleep a bit and then continue + boolean interrupted = Thread.interrupted(); + try { + Thread.sleep(PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION); + } catch (InterruptedException ignore) { + interrupted = true; + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + break; + } // switch + count++; + + if (currentTarget == null) { // pick target + checkReadiness(); + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + + currentTarget = waitForNodeOrCreateBucket(retryTime, null, bucketId, false); + if (currentTarget == null) { + // the bucket does not exist, no need to clear + logger.info("Bucket " + bucketId + " does not contain data, no need to clear"); + return; + } else { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: new currentTarget is {}", currentTarget); + } + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw exception + checkShutdown(); + continue; + } // pick target + + boolean result = false; + try { + final boolean isLocal = (this.localMaxMemory > 0) && currentTarget.equals(getMyId()); + if (isLocal) { + result = clearPRMessage.doLocalClear(this, bucketId); + } else { + ClearPRMessage.ClearResponse response = clearPRMessage.send(currentTarget, this); + if (response != null) { + this.prStats.incPartitionMessagesSent(); + result = response.waitForResult(); + } + } + } catch (ForceReattemptException prce) { + checkReadiness(); + InternalDistributedMember lastTarget = currentTarget; + if (retryTime == null) { + retryTime = new RetryTimeKeeper(this.retryTimeout); + } + currentTarget = getNodeForBucketWrite(bucketId, retryTime); + if (logger.isDebugEnabled()) { + logger.debug("PR.sendMsgByBucket: Old target was {}, Retrying {}", lastTarget, + currentTarget); + } + if (lastTarget.equals(currentTarget)) { + if (logger.isDebugEnabled()) { + logger.debug("PR.sendClearMsgByBucket: Retrying at the same node:{} due to {}", + currentTarget, prce.getMessage()); + } + if (retryTime.overMaximum()) { + PRHARedundancyProvider.timedOut(this, null, null, "update an entry", + this.retryTimeout); + // NOTREACHED + } + retryTime.waitToRetryNode(); + } + } + + // It's possible this is a GemFire thread e.g. ServerConnection + // which got to this point because of a distributed system shutdown or + // region closure which uses interrupt to break any sleep() or wait() + // calls + // e.g. waitForPrimary or waitForBucketRecovery in which case throw + // exception + checkShutdown(); + + // If we get here, the attempt failed... + if (count == 1) { + // TODO prStats add ClearPRMsg retried + this.prStats.incPutAllMsgsRetried(); + } + } + } + + List createClearPRMessages() { + if (cache.isCacheAtShutdownAll()) { + throw cache.getCacheClosedException("Cache is shutting down"); + } + + ArrayList<ClearPRMessage> clearMsgList = new ArrayList<>(); + for (int bucketId = 0; bucketId < this.totalNumberOfBuckets; bucketId++) { + ClearPRMessage clearPRMessage = new ClearPRMessage(bucketId); + clearMsgList.add(clearPRMessage); + } + return clearMsgList; } @Override @@ -2574,7 +2750,7 @@ public class PartitionedRegion extends LocalRegion retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); if (isDebugEnabled) { logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}", getEntrySize(event), currentTarget); @@ -2715,7 +2891,7 @@ public class PartitionedRegion extends LocalRegion retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); if (logger.isDebugEnabled()) { logger.debug("PR.sendMsgByBucket: event size is {}, new currentTarget is {}", getEntrySize(event), currentTarget); @@ -2960,7 +3136,7 @@ public class PartitionedRegion extends LocalRegion if (retryTime == null) { retryTime = new RetryTimeKeeper(this.retryTimeout); } - currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId); + currentTarget = waitForNodeOrCreateBucket(retryTime, event, bucketId, true); // It's possible this is a GemFire thread e.g. ServerConnection // which got to this point because of a distributed system shutdown or @@ -3122,7 +3298,7 @@ public class PartitionedRegion extends LocalRegion * @return a Node which contains the bucket, potentially null */ private InternalDistributedMember waitForNodeOrCreateBucket(RetryTimeKeeper retryTime, - EntryEventImpl event, Integer bucketId) { + EntryEventImpl event, Integer bucketId, boolean createIfNotExist) { InternalDistributedMember newNode; if (retryTime.overMaximum()) { PRHARedundancyProvider.timedOut(this, null, null, "allocate a bucket", @@ -3132,7 +3308,7 @@ public class PartitionedRegion extends LocalRegion retryTime.waitForBucketsRecovery(); newNode = getNodeForBucketWrite(bucketId, retryTime); - if (newNode == null) { + if (newNode == null && createIfNotExist) { newNode = createBucket(bucketId, getEntrySize(event), retryTime); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java index 1a8aba1..07bb311 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java @@ -169,7 +169,7 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager, PartitionedRegion region, long startTime) { try { - result = doLocalClear(region); + result = doLocalClear(region, getBucketId()); } catch (ForceReattemptException ex) { sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region, startTime); @@ -179,39 +179,28 @@ public class ClearPRMessage extends PartitionMessageWithDirectReply { return false; } - public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException { + public int getBucketId() { + return this.bucketId; + } + + public boolean doLocalClear(PartitionedRegion region, int bucketId) + throws ForceReattemptException { // Retrieve local bucket region which matches target bucketId - BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId); + BucketRegion bucketRegion = + region.getDataStore().getInitializedBucketForId(null, this.bucketId); + boolean lockedForPrimary = bucketRegion.doLockForPrimary(false); // Check if we are primary, throw exception if not - if (!bucketRegion.isPrimary()) { + if (!lockedForPrimary) { throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); } - - DistributedLockService lockService = getPartitionRegionLockService(); - String lockName = bucketRegion.getFullPath(); try { - boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1); - - if (!locked) { - throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); - } - - // Double check if we are still primary, as this could have changed between our first check - // and obtaining the lock - if (!bucketRegion.isPrimary()) { - throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); - } - - try { - bucketRegion.cmnClearRegion(regionEvent, true, true); - } catch (Exception ex) { - throw new ForceReattemptException( - EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex); - } - + bucketRegion.cmnClearRegion(regionEvent, true, true); + } catch (Exception ex) { + throw new ForceReattemptException( + EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex); } finally { - lockService.unlock(lockName); + bucketRegion.doUnlockForPrimary(); } return true; diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java index 2cf5231..2778993 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java @@ -20,7 +20,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.doNothing; @@ -38,7 +37,6 @@ import java.util.Set; import org.junit.Before; import org.junit.Test; -import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionManager; @@ -72,53 +70,28 @@ public class ClearPRMessageTest { public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() { when(bucketRegion.isPrimary()).thenReturn(false); - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, 0)) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); } @Test public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); + when(bucketRegion.doLockForPrimary(false)).thenReturn(false); - when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false); - when(bucketRegion.isPrimary()).thenReturn(true); - - assertThatThrownBy(() -> message.doLocalClear(region)) - .isInstanceOf(ForceReattemptException.class) - .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); - } - - @Test - public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); - - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); - - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, 0)) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - // Confirm that we actually obtained and released the lock - verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong()); - verify(mockLockService, times(1)).unlock(any()); } @Test public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); NullPointerException exception = new NullPointerException("Error encountered"); doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); + when(bucketRegion.doLockForPrimary(false)).thenReturn(true); - assertThatThrownBy(() -> message.doLocalClear(region)) + assertThatThrownBy(() -> message.doLocalClear(region, bucketRegion.getId())) .isInstanceOf(ForceReattemptException.class) .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION); @@ -129,21 +102,13 @@ public class ClearPRMessageTest { @Test public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained() throws ForceReattemptException { - DistributedLockService mockLockService = mock(DistributedLockService.class); - doReturn(mockLockService).when(message).getPartitionRegionLockService(); - // Be primary on the first check, then be not primary on the second check - when(bucketRegion.isPrimary()).thenReturn(true); - when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); - assertThat(message.doLocalClear(region)).isTrue(); + when(bucketRegion.doLockForPrimary(false)).thenReturn(true); + assertThat(message.doLocalClear(region, 0)).isTrue(); // Confirm that cmnClearRegion was called verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean()); - - // Confirm that we actually obtained and released the lock - verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong()); - verify(mockLockService, times(1)).unlock(any()); } @Test @@ -197,7 +162,8 @@ public class ClearPRMessageTest { int processorId = 1000; int startTime = 0; - doReturn(true).when(message).doLocalClear(region); + doReturn(0).when(message).getBucketId(); + doReturn(true).when(message).doLocalClear(region, 0); doReturn(sender).when(message).getSender(); doReturn(processorId).when(message).getProcessorId(); @@ -222,7 +188,8 @@ public class ClearPRMessageTest { ForceReattemptException exception = new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); - doThrow(exception).when(message).doLocalClear(region); + doReturn(0).when(message).getBucketId(); + doThrow(exception).when(message).doLocalClear(region, 0); doReturn(sender).when(message).getSender(); doReturn(processorId).when(message).getProcessorId();