This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch feature/GEODE-7665 in repository https://gitbox.apache.org/repos/asf/geode.git
commit ccb543fc2b3911304d15c84cda36b6ede0c2a41b Author: zhouxh <gz...@pivotal.io> AuthorDate: Mon Jan 27 17:02:48 2020 -0800 GEODE-7683: introduce BR.cmnClearRegion Co-authored-by: Xiaojian Zhou <gz...@pivotal.io> GEODE-7684: Create messaging class for PR Clear (#4689) * Added new message class and test Co-authored-by: Benjamin Ross <br...@pivotal.io> Co-authored-by: Donal Evans <doev...@pivotal.io> --- .../codeAnalysis/sanctionedDataSerializables.txt | 8 + .../apache/geode/internal/cache/BucketRegion.java | 38 +- .../geode/internal/cache/DistributedRegion.java | 23 +- .../internal/cache/partitioned/ClearPRMessage.java | 388 +++++++++++++++++++++ .../internal/cache/BucketRegionJUnitTest.java | 77 ++++ .../internal/cache/DistributedRegionJUnitTest.java | 18 + .../cache/partitioned/ClearPRMessageTest.java | 288 +++++++++++++++ .../serialization/DataSerializableFixedID.java | 3 + 8 files changed, 832 insertions(+), 11 deletions(-) diff --git a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt index 3076db7..d6806f2 100644 --- a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt +++ b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt @@ -1435,6 +1435,14 @@ org/apache/geode/internal/cache/partitioned/BucketSizeMessage$BucketSizeReplyMes fromData,27 toData,27 +org/apache/geode/internal/cache/partitioned/ClearPRMessage,2 +fromData,30 +toData,44 + +org/apache/geode/internal/cache/partitioned/ClearPRMessage$ClearReplyMessage,2 +fromData,17 +toData,17 + org/apache/geode/internal/cache/partitioned/ColocatedRegionDetails,2 fromData,81 toData,133 diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java index ac20526..db8c057 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java @@ -557,6 +557,36 @@ public class BucketRegion extends DistributedRegion implements Bucket { } } + @Override + public void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) { + if (!getBucketAdvisor().isPrimary()) { + if (logger.isDebugEnabled()) { + logger.debug("Not primary bucket when doing clear, do nothing"); + } + return; + } + + boolean enableRVV = useRVV && getConcurrencyChecksEnabled(); + RegionVersionVector rvv = null; + if (enableRVV) { + rvv = getVersionVector().getCloneForTransmission(); + } + + // get rvvLock + Set<InternalDistributedMember> participants = + getCacheDistributionAdvisor().adviseInvalidateRegion(); + try { + obtainWriteLocksForClear(regionEvent, participants); + // no need to dominate my own rvv. + // Clear is on going here, there won't be GII for this member + clearRegionLocally(regionEvent, cacheWrite, null); + distributeClearOperation(regionEvent, rvv, participants); + + // TODO: call reindexUserDataRegion if there're lucene indexes + } finally { + releaseWriteLocksForClear(regionEvent, participants); + } + } long generateTailKey() { long key = eventSeqNum.addAndGet(partitionedRegion.getTotalNumberOfBuckets()); @@ -2094,11 +2124,9 @@ public class BucketRegion extends DistributedRegion implements Bucket { // if GII has failed, because there is not primary. So it's safe to set these // counters to 0. oldMemValue = bytesInMemory.getAndSet(0); - } - - else { - throw new InternalGemFireError( - "Trying to clear a bucket region that was not destroyed or in initialization."); + } else { + // BucketRegion's clear is supported now + oldMemValue = bytesInMemory.getAndSet(0); } if (oldMemValue != BUCKET_DESTROYED) { partitionedRegion.getPrStats().incDataStoreEntryCount(-sizeBeforeClear); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java index dd73b20..1465eef 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java @@ -2003,6 +2003,10 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute super.basicClear(regionEvent, cacheWrite); } + void distributeClearOperation(RegionEventImpl regionEvent, RegionVersionVector rvv, + Set<InternalDistributedMember> participants) { + DistributedClearOperation.clear(regionEvent, rvv, participants); + } @Override void cmnClearRegion(RegionEventImpl regionEvent, boolean cacheWrite, boolean useRVV) { @@ -2025,7 +2029,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute obtainWriteLocksForClear(regionEvent, participants); clearRegionLocally(regionEvent, cacheWrite, null); if (!regionEvent.isOriginRemote() && regionEvent.getOperation().isDistributed()) { - DistributedClearOperation.clear(regionEvent, null, participants); + distributeClearOperation(regionEvent, null, participants); } } finally { releaseWriteLocksForClear(regionEvent, participants); @@ -2081,10 +2085,12 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute /** * obtain locks preventing generation of new versions in other members */ - private void obtainWriteLocksForClear(RegionEventImpl regionEvent, + protected void obtainWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { lockLocallyForClear(getDistributionManager(), getMyId(), regionEvent); - DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants); + if (!isUsedForPartitionedRegionBucket()) { + DistributedClearOperation.lockAndFlushToOthers(regionEvent, participants); + } } /** @@ -2121,7 +2127,7 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute /** * releases the locks obtained in obtainWriteLocksForClear */ - private void releaseWriteLocksForClear(RegionEventImpl regionEvent, + protected void releaseWriteLocksForClear(RegionEventImpl regionEvent, Set<InternalDistributedMember> participants) { ARMLockTestHook armLockTestHook = getRegionMap().getARMLockTestHook(); @@ -2129,8 +2135,13 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute armLockTestHook.beforeRelease(this, regionEvent); } - getVersionVector().unlockForClear(getMyId()); - DistributedClearOperation.releaseLocks(regionEvent, participants); + RegionVersionVector rvv = getVersionVector(); + if (rvv != null) { + rvv.unlockForClear(getMyId()); + } + if (!isUsedForPartitionedRegionBucket()) { + DistributedClearOperation.releaseLocks(regionEvent, participants); + } if (armLockTestHook != null) { armLockTestHook.afterRelease(this, regionEvent); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java new file mode 100644 index 0000000..1a8aba1 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/ClearPRMessage.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache.partitioned; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Collections; +import java.util.Set; + +import org.apache.logging.log4j.Logger; + +import org.apache.geode.DataSerializer; +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.cache.CacheException; +import org.apache.geode.distributed.DistributedLockService; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DirectReplyProcessor; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.ReplyMessage; +import org.apache.geode.distributed.internal.ReplyProcessor21; +import org.apache.geode.distributed.internal.ReplySender; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Assert; +import org.apache.geode.internal.InternalDataSerializer; +import org.apache.geode.internal.NanoTimer; +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.ForceReattemptException; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionHelper; +import org.apache.geode.internal.cache.RegionEventImpl; +import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.internal.serialization.DeserializationContext; +import org.apache.geode.internal.serialization.SerializationContext; +import org.apache.geode.logging.internal.log4j.api.LogService; + +public class ClearPRMessage extends PartitionMessageWithDirectReply { + private static final Logger logger = LogService.getLogger(); + + private RegionEventImpl regionEvent; + + private Integer bucketId; + + /** The time in ms to wait for a lock to be obtained during doLocalClear() */ + public static final int LOCK_WAIT_TIMEOUT_MS = 1000; + public static final String BUCKET_NON_PRIMARY_MESSAGE = + "The bucket region on target member is no longer primary"; + public static final String BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE = + "A lock for the bucket region could not be obtained."; + public static final String EXCEPTION_THROWN_DURING_CLEAR_OPERATION = + "An exception was thrown during the local clear operation: "; + + /** + * state from operateOnRegion that must be preserved for transmission from the waiting pool + */ + transient boolean result = false; + + /** + * Empty constructor to satisfy {@link DataSerializer}requirements + */ + public ClearPRMessage() {} + + public ClearPRMessage(int bucketId) { + this.bucketId = bucketId; + + // These are both used by the parent class, but don't apply to this message type + this.notificationOnly = false; + this.posDup = false; + } + + public void setRegionEvent(RegionEventImpl event) { + regionEvent = event; + } + + public void initMessage(PartitionedRegion region, Set<InternalDistributedMember> recipients, + DirectReplyProcessor replyProcessor) { + this.resetRecipients(); + if (recipients != null) { + setRecipients(recipients); + } + this.regionId = region.getPRId(); + this.processor = replyProcessor; + this.processorId = replyProcessor == null ? 0 : replyProcessor.getProcessorId(); + if (replyProcessor != null) { + replyProcessor.enableSevereAlertProcessing(); + } + } + + @Override + public boolean isSevereAlertCompatible() { + // allow forced-disconnect processing for all cache op messages + return true; + } + + public RegionEventImpl getRegionEvent() { + return regionEvent; + } + + public ClearResponse send(DistributedMember recipient, PartitionedRegion region) + throws ForceReattemptException { + Set<InternalDistributedMember> recipients = + Collections.singleton((InternalDistributedMember) recipient); + ClearResponse clearResponse = new ClearResponse(region.getSystem(), recipients); + initMessage(region, recipients, clearResponse); + if (logger.isDebugEnabled()) { + logger.debug("ClearPRMessage.send: recipient is {}, msg is {}", recipient, this); + } + + Set<InternalDistributedMember> failures = region.getDistributionManager().putOutgoing(this); + if (failures != null && failures.size() > 0) { + throw new ForceReattemptException("Failed sending <" + this + ">"); + } + return clearResponse; + } + + @Override + public int getDSFID() { + return PR_CLEAR_MESSAGE; + } + + @Override + public void toData(DataOutput out, SerializationContext context) throws IOException { + super.toData(out, context); + if (bucketId == null) { + InternalDataSerializer.writeSignedVL(-1, out); + } else { + InternalDataSerializer.writeSignedVL(bucketId, out); + } + DataSerializer.writeObject(regionEvent, out); + } + + @Override + public void fromData(DataInput in, DeserializationContext context) + throws IOException, ClassNotFoundException { + super.fromData(in, context); + this.bucketId = (int) InternalDataSerializer.readSignedVL(in); + this.regionEvent = DataSerializer.readObject(in); + } + + @Override + public EventID getEventID() { + return regionEvent.getEventId(); + } + + /** + * This method is called upon receipt and make the desired changes to the PartitionedRegion Note: + * It is very important that this message does NOT cause any deadlocks as the sender will wait + * indefinitely for the acknowledgement + */ + @Override + @VisibleForTesting + protected boolean operateOnPartitionedRegion(ClusterDistributionManager distributionManager, + PartitionedRegion region, long startTime) { + try { + result = doLocalClear(region); + } catch (ForceReattemptException ex) { + sendReply(getSender(), getProcessorId(), distributionManager, new ReplyException(ex), region, + startTime); + return false; + } + sendReply(getSender(), getProcessorId(), distributionManager, null, region, startTime); + return false; + } + + public boolean doLocalClear(PartitionedRegion region) throws ForceReattemptException { + // Retrieve local bucket region which matches target bucketId + BucketRegion bucketRegion = region.getDataStore().getInitializedBucketForId(null, bucketId); + + // Check if we are primary, throw exception if not + if (!bucketRegion.isPrimary()) { + throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); + } + + DistributedLockService lockService = getPartitionRegionLockService(); + String lockName = bucketRegion.getFullPath(); + try { + boolean locked = lockService.lock(lockName, LOCK_WAIT_TIMEOUT_MS, -1); + + if (!locked) { + throw new ForceReattemptException(BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); + } + + // Double check if we are still primary, as this could have changed between our first check + // and obtaining the lock + if (!bucketRegion.isPrimary()) { + throw new ForceReattemptException(BUCKET_NON_PRIMARY_MESSAGE); + } + + try { + bucketRegion.cmnClearRegion(regionEvent, true, true); + } catch (Exception ex) { + throw new ForceReattemptException( + EXCEPTION_THROWN_DURING_CLEAR_OPERATION + ex.getClass().getName(), ex); + } + + } finally { + lockService.unlock(lockName); + } + + return true; + } + + // Extracted for testing + protected DistributedLockService getPartitionRegionLockService() { + return DistributedLockService + .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); + } + + @Override + public boolean canStartRemoteTransaction() { + return false; + } + + @Override + protected void sendReply(InternalDistributedMember member, int processorId, + DistributionManager distributionManager, ReplyException ex, + PartitionedRegion partitionedRegion, long startTime) { + if (partitionedRegion != null) { + if (startTime > 0) { + partitionedRegion.getPrStats().endPartitionMessagesProcessing(startTime); + } + } + ClearReplyMessage.send(member, processorId, getReplySender(distributionManager), this.result, + ex); + } + + @Override + protected void appendFields(StringBuilder buff) { + super.appendFields(buff); + buff.append("; bucketId=").append(this.bucketId); + } + + @Override + public String toString() { + StringBuilder buff = new StringBuilder(); + String className = getClass().getName(); + buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo> + buff.append("(prid="); // make sure this is the first one + buff.append(this.regionId); + + // Append name, if we have it + String name = null; + try { + PartitionedRegion region = PartitionedRegion.getPRFromId(this.regionId); + if (region != null) { + name = region.getFullPath(); + } + } catch (Exception ignore) { + /* ignored */ + } + if (name != null) { + buff.append(" (name = \"").append(name).append("\")"); + } + + appendFields(buff); + buff.append(" ,distTx="); + buff.append(this.isTransactionDistributed); + buff.append(")"); + return buff.toString(); + } + + public static class ClearReplyMessage extends ReplyMessage { + /** Result of the Clear operation */ + boolean result; + + @Override + public boolean getInlineProcess() { + return true; + } + + /** + * Empty constructor to conform to DataSerializable interface + */ + @SuppressWarnings("unused") + public ClearReplyMessage() {} + + private ClearReplyMessage(int processorId, boolean result, ReplyException ex) { + super(); + this.result = result; + setProcessorId(processorId); + setException(ex); + } + + /** Send an ack */ + public static void send(InternalDistributedMember recipient, int processorId, + ReplySender replySender, + boolean result, ReplyException ex) { + Assert.assertTrue(recipient != null, "ClearReplyMessage NULL reply message"); + ClearReplyMessage message = new ClearReplyMessage(processorId, result, ex); + message.setRecipient(recipient); + replySender.putOutgoing(message); + } + + /** + * Processes this message. This method is invoked by the receiver of the message. + * + * @param distributionManager the distribution manager that is processing the message. + */ + @Override + public void process(final DistributionManager distributionManager, + final ReplyProcessor21 replyProcessor) { + final long startTime = getTimestamp(); + if (replyProcessor == null) { + if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { + logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this); + } + return; + } + if (replyProcessor instanceof ClearResponse) { + ((ClearResponse) replyProcessor).setResponse(this); + } + replyProcessor.process(this); + + if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) { + logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", replyProcessor, this); + } + distributionManager.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime); + } + + @Override + public int getDSFID() { + return PR_CLEAR_REPLY_MESSAGE; + } + + @Override + public void fromData(DataInput in, + DeserializationContext context) throws IOException, ClassNotFoundException { + super.fromData(in, context); + this.result = in.readBoolean(); + } + + @Override + public void toData(DataOutput out, + SerializationContext context) throws IOException { + super.toData(out, context); + out.writeBoolean(this.result); + } + + @Override + public String toString() { + return "ClearReplyMessage " + "processorid=" + this.processorId + " returning " + this.result + + " exception=" + getException(); + } + } + + /** + * A processor to capture the value returned by {@link ClearPRMessage} + */ + public static class ClearResponse extends PartitionResponse { + private volatile boolean returnValue; + + public ClearResponse(InternalDistributedSystem distributedSystem, + Set<InternalDistributedMember> recipients) { + super(distributedSystem, recipients, false); + } + + public void setResponse(ClearReplyMessage response) { + this.returnValue = response.result; + } + + /** + * @return the result of the remote clear operation + * @throws ForceReattemptException if the peer is no longer available + * @throws CacheException if the peer generates an error + */ + public boolean waitForResult() throws CacheException, ForceReattemptException { + waitForCacheException(); + return this.returnValue; + } + } +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java index 72e6657..c7cf5a6 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionJUnitTest.java @@ -14,7 +14,9 @@ */ package org.apache.geode.internal.cache; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.anyLong; @@ -31,7 +33,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.junit.Test; + import org.apache.geode.cache.RegionAttributes; +import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.statistics.StatisticsClock; public class BucketRegionJUnitTest extends DistributedRegionJUnitTest { @@ -128,4 +133,76 @@ public class BucketRegionJUnitTest extends DistributedRegionJUnitTest { } } + @Test + public void cmnClearRegionWillDoNothingIfNotPrimary() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + BucketAdvisor ba = mock(BucketAdvisor.class); + RegionVersionVector rvv = mock(RegionVersionVector.class); + doReturn(rvv).when(region).getVersionVector(); + doReturn(ba).when(region).getBucketAdvisor(); + when(ba.isPrimary()).thenReturn(false); + region.cmnClearRegion(event, true, true); + verify(region, never()).clearRegionLocally(eq(event), eq(true), eq(rvv)); + } + + @Test + public void cmnClearRegionCalledOnPrimary() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + BucketAdvisor ba = mock(BucketAdvisor.class); + RegionVersionVector rvv = mock(RegionVersionVector.class); + doReturn(rvv).when(region).getVersionVector(); + doReturn(true).when(region).getConcurrencyChecksEnabled(); + doReturn(ba).when(region).getBucketAdvisor(); + doNothing().when(region).distributeClearOperation(any(), any(), any()); + doNothing().when(region).lockLocallyForClear(any(), any(), any()); + doNothing().when(region).clearRegionLocally(event, true, null); + when(ba.isPrimary()).thenReturn(true); + region.cmnClearRegion(event, true, true); + verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null)); + } + + @Test + public void clearWillUseNullAsRVVWhenConcurrencyCheckDisabled() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + BucketAdvisor ba = mock(BucketAdvisor.class); + doReturn(false).when(region).getConcurrencyChecksEnabled(); + doReturn(ba).when(region).getBucketAdvisor(); + doNothing().when(region).distributeClearOperation(any(), any(), any()); + doNothing().when(region).lockLocallyForClear(any(), any(), any()); + doNothing().when(region).clearRegionLocally(event, true, null); + when(ba.isPrimary()).thenReturn(true); + region.cmnClearRegion(event, true, true); + verify(region, times(1)).clearRegionLocally(eq(event), eq(true), eq(null)); + } + + @Test + public void obtainWriteLocksForClearInBRShouldNotDistribute() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + doNothing().when(region).lockLocallyForClear(any(), any(), any()); + region.obtainWriteLocksForClear(event, null); + assertTrue(region.isUsedForPartitionedRegionBucket()); + } + + @Test + public void updateSizeToZeroOnClearBucketRegion() { + RegionEventImpl event = createClearRegionEvent(); + BucketRegion region = (BucketRegion) event.getRegion(); + PartitionedRegion pr = region.getPartitionedRegion(); + PartitionedRegionDataStore prds = mock(PartitionedRegionDataStore.class); + PartitionedRegionStats prStats = mock(PartitionedRegionStats.class); + when(pr.getPrStats()).thenReturn(prStats); + doNothing().when(prStats).incDataStoreEntryCount(anyInt()); + doNothing().when(prds).updateMemoryStats(anyInt()); + when(pr.getDataStore()).thenReturn(prds); + region.updateSizeOnCreate("key1", 20); + long sizeBeforeClear = region.getTotalBytes(); + assertEquals(20, sizeBeforeClear); + region.updateSizeOnClearRegion((int) sizeBeforeClear); + long sizeAfterClear = region.getTotalBytes(); + assertEquals(0, sizeAfterClear); + } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java index 9fbd8fc..ca53ced 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/DistributedRegionJUnitTest.java @@ -14,6 +14,7 @@ */ package org.apache.geode.internal.cache; +import static org.apache.geode.internal.Assert.fail; import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; @@ -53,6 +54,14 @@ public class DistributedRegionJUnitTest @Override protected void setInternalRegionArguments(InternalRegionArguments ira) {} + protected RegionEventImpl createClearRegionEvent() { + DistributedRegion region = prepare(true, true); + DistributedMember member = mock(DistributedMember.class); + RegionEventImpl regionEvent = new RegionEventImpl(region, Operation.REGION_CLEAR, null, false, + member, true); + return regionEvent; + } + @Override protected DistributedRegion createAndDefineRegion(boolean isConcurrencyChecksEnabled, RegionAttributes ra, InternalRegionArguments ira, GemFireCacheImpl cache, @@ -246,4 +255,13 @@ public class DistributedRegionJUnitTest region.basicBridgeReplace("key1", "value1", false, null, client, true, clientEvent); assertThat(clientEvent.getVersionTag().equals(tag)); } + + @Test(expected = UnsupportedOperationException.class) + public void localClearIsNotSupportedOnReplicatedRegion() { + RegionEventImpl event = createClearRegionEvent(); + DistributedRegion region = (DistributedRegion) event.getRegion(); + region.basicLocalClear(event); + fail("Expect UnsupportedOperationException"); + } + } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java new file mode 100644 index 0000000..2cf5231 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/ClearPRMessageTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.partitioned; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.notNull; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Before; +import org.junit.Test; + +import org.apache.geode.distributed.DistributedLockService; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DMStats; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.ReplyException; +import org.apache.geode.distributed.internal.ReplySender; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.BucketRegion; +import org.apache.geode.internal.cache.ForceReattemptException; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.internal.cache.PartitionedRegionDataStore; +import org.apache.geode.internal.cache.PartitionedRegionStats; + +public class ClearPRMessageTest { + + ClearPRMessage message; + PartitionedRegion region; + PartitionedRegionDataStore dataStore; + BucketRegion bucketRegion; + + @Before + public void setup() throws ForceReattemptException { + message = spy(new ClearPRMessage()); + region = mock(PartitionedRegion.class, RETURNS_DEEP_STUBS); + dataStore = mock(PartitionedRegionDataStore.class); + when(region.getDataStore()).thenReturn(dataStore); + bucketRegion = mock(BucketRegion.class); + when(dataStore.getInitializedBucketForId(any(), any())).thenReturn(bucketRegion); + } + + @Test + public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAtFirstCheck() { + when(bucketRegion.isPrimary()).thenReturn(false); + + assertThatThrownBy(() -> message.doLocalClear(region)) + .isInstanceOf(ForceReattemptException.class) + .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); + } + + @Test + public void doLocalClearThrowsExceptionWhenLockCannotBeObtained() { + DistributedLockService mockLockService = mock(DistributedLockService.class); + doReturn(mockLockService).when(message).getPartitionRegionLockService(); + + when(mockLockService.lock(anyString(), anyLong(), anyLong())).thenReturn(false); + when(bucketRegion.isPrimary()).thenReturn(true); + + assertThatThrownBy(() -> message.doLocalClear(region)) + .isInstanceOf(ForceReattemptException.class) + .hasMessageContaining(ClearPRMessage.BUCKET_REGION_LOCK_UNAVAILABLE_MESSAGE); + } + + @Test + public void doLocalClearThrowsExceptionWhenBucketIsNotPrimaryAfterObtainingLock() { + DistributedLockService mockLockService = mock(DistributedLockService.class); + doReturn(mockLockService).when(message).getPartitionRegionLockService(); + + // Be primary on the first check, then be not primary on the second check + when(bucketRegion.isPrimary()).thenReturn(true).thenReturn(false); + when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); + + assertThatThrownBy(() -> message.doLocalClear(region)) + .isInstanceOf(ForceReattemptException.class) + .hasMessageContaining(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); + // Confirm that we actually obtained and released the lock + verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong()); + verify(mockLockService, times(1)).unlock(any()); + } + + @Test + public void doLocalClearThrowsForceReattemptExceptionWhenAnExceptionIsThrownDuringClearOperation() { + DistributedLockService mockLockService = mock(DistributedLockService.class); + doReturn(mockLockService).when(message).getPartitionRegionLockService(); + NullPointerException exception = new NullPointerException("Error encountered"); + doThrow(exception).when(bucketRegion).cmnClearRegion(any(), anyBoolean(), anyBoolean()); + + // Be primary on the first check, then be not primary on the second check + when(bucketRegion.isPrimary()).thenReturn(true); + when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); + + assertThatThrownBy(() -> message.doLocalClear(region)) + .isInstanceOf(ForceReattemptException.class) + .hasMessageContaining(ClearPRMessage.EXCEPTION_THROWN_DURING_CLEAR_OPERATION); + + // Confirm that cmnClearRegion was called + verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean()); + } + + @Test + public void doLocalClearInvokesCmnClearRegionWhenBucketIsPrimaryAndLockIsObtained() + throws ForceReattemptException { + DistributedLockService mockLockService = mock(DistributedLockService.class); + doReturn(mockLockService).when(message).getPartitionRegionLockService(); + + + // Be primary on the first check, then be not primary on the second check + when(bucketRegion.isPrimary()).thenReturn(true); + when(mockLockService.lock(any(), anyLong(), anyLong())).thenReturn(true); + assertThat(message.doLocalClear(region)).isTrue(); + + // Confirm that cmnClearRegion was called + verify(bucketRegion, times(1)).cmnClearRegion(any(), anyBoolean(), anyBoolean()); + + // Confirm that we actually obtained and released the lock + verify(mockLockService, times(1)).lock(any(), anyLong(), anyLong()); + verify(mockLockService, times(1)).unlock(any()); + } + + @Test + public void initMessageSetsReplyProcessorCorrectlyWithDefinedReplyProcessor() { + InternalDistributedMember sender = mock(InternalDistributedMember.class); + + Set<InternalDistributedMember> recipients = new HashSet<>(); + recipients.add(sender); + + ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class); + int mockProcessorId = 5; + when(mockProcessor.getProcessorId()).thenReturn(mockProcessorId); + + message.initMessage(region, recipients, mockProcessor); + + verify(mockProcessor, times(1)).enableSevereAlertProcessing(); + assertThat(message.getProcessorId()).isEqualTo(mockProcessorId); + } + + @Test + public void initMessageSetsProcessorIdToZeroWithNullProcessor() { + message.initMessage(region, null, null); + + assertThat(message.getProcessorId()).isEqualTo(0); + } + + @Test + public void sendThrowsExceptionIfPutOutgoingMethodReturnsNonNullSetOfFailures() { + InternalDistributedMember recipient = mock(InternalDistributedMember.class); + + DistributionManager distributionManager = mock(DistributionManager.class); + when(region.getDistributionManager()).thenReturn(distributionManager); + + doNothing().when(message).initMessage(any(), any(), any()); + Set<InternalDistributedMember> failures = new HashSet<>(); + failures.add(recipient); + + when(distributionManager.putOutgoing(message)).thenReturn(failures); + + assertThatThrownBy(() -> message.send(recipient, region)) + .isInstanceOf(ForceReattemptException.class) + .hasMessageContaining("Failed sending <" + message + ">"); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void operateOnPartitionedRegionCallsSendReplyWithNoExceptionWhenDoLocalClearSucceeds() + throws ForceReattemptException { + ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); + InternalDistributedMember sender = mock(InternalDistributedMember.class); + int processorId = 1000; + int startTime = 0; + + doReturn(true).when(message).doLocalClear(region); + doReturn(sender).when(message).getSender(); + doReturn(processorId).when(message).getProcessorId(); + + // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to + // do nothing and verify later that it was called with proper input + doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong()); + + message.operateOnPartitionedRegion(distributionManager, region, startTime); + + verify(message, times(1)).sendReply(sender, processorId, distributionManager, null, region, + startTime); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void operateOnPartitionedRegionCallsSendReplyWithExceptionWhenDoLocalClearFailsWithException() + throws ForceReattemptException { + ClusterDistributionManager distributionManager = mock(ClusterDistributionManager.class); + InternalDistributedMember sender = mock(InternalDistributedMember.class); + int processorId = 1000; + int startTime = 0; + ForceReattemptException exception = + new ForceReattemptException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); + + doThrow(exception).when(message).doLocalClear(region); + doReturn(sender).when(message).getSender(); + doReturn(processorId).when(message).getProcessorId(); + + // We don't want to deal with mocking the behavior of sendReply() in this test, so we mock it to + // do nothing and verify later that it was called with proper input + doNothing().when(message).sendReply(any(), anyInt(), any(), any(), any(), anyLong()); + + message.operateOnPartitionedRegion(distributionManager, region, startTime); + + verify(message, times(1)).sendReply(any(), anyInt(), any(), notNull(), any(), anyLong()); + } + + @Test + public void sendReplyEndsMessageProcessingIfWeHaveARegionAndHaveStartedProcessing() { + DistributionManager distributionManager = mock(DistributionManager.class); + InternalDistributedMember recipient = mock(InternalDistributedMember.class); + PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class); + when(region.getPrStats()).thenReturn(partitionedRegionStats); + + int processorId = 1000; + int startTime = 10000; + ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); + + ReplySender replySender = mock(ReplySender.class); + doReturn(replySender).when(message).getReplySender(distributionManager); + + message.sendReply(recipient, processorId, distributionManager, exception, region, startTime); + + verify(partitionedRegionStats, times(1)).endPartitionMessagesProcessing(startTime); + } + + @Test + public void sendReplyDoesNotEndMessageProcessingIfStartTimeIsZero() { + DistributionManager distributionManager = mock(DistributionManager.class); + InternalDistributedMember recipient = mock(InternalDistributedMember.class); + PartitionedRegionStats partitionedRegionStats = mock(PartitionedRegionStats.class); + when(region.getPrStats()).thenReturn(partitionedRegionStats); + + int processorId = 1000; + int startTime = 0; + ReplyException exception = new ReplyException(ClearPRMessage.BUCKET_NON_PRIMARY_MESSAGE); + + ReplySender replySender = mock(ReplySender.class); + doReturn(replySender).when(message).getReplySender(distributionManager); + + message.sendReply(recipient, processorId, distributionManager, exception, region, startTime); + + verify(partitionedRegionStats, times(0)).endPartitionMessagesProcessing(startTime); + } + + @Test + public void clearReplyMessageProcessCallsSetResponseIfReplyProcessorIsInstanceOfClearResponse() { + DistributionManager distributionManager = mock(DistributionManager.class); + DMStats mockStats = mock(DMStats.class); + when(distributionManager.getStats()).thenReturn(mockStats); + ClearPRMessage.ClearReplyMessage clearReplyMessage = new ClearPRMessage.ClearReplyMessage(); + ClearPRMessage.ClearResponse mockProcessor = mock(ClearPRMessage.ClearResponse.class); + + clearReplyMessage.process(distributionManager, mockProcessor); + + verify(mockProcessor, times(1)).setResponse(clearReplyMessage); + } +} diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java index e00dd64..0f5ee97 100644 --- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java +++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java @@ -57,6 +57,9 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer // NOTE, codes < -65536 will take 4 bytes to serialize // NOTE, codes < -128 will take 2 bytes to serialize + short PR_CLEAR_REPLY_MESSAGE = -161; + short PR_CLEAR_MESSAGE = -160; + short CREATE_REGION_MESSAGE_LUCENE = -159; short FINAL_CHECK_PASSED_MESSAGE = -158; short NETWORK_PARTITION_MESSAGE = -157;