This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new 85953f0 GEODE-5631: failedBatchRemovalMessageKeys not used after GII (#2375) 85953f0 is described below commit 85953f085b13815405cb9bf3c5b1bca77c3c9a5e Author: Nabarun Nag <nabarun...@users.noreply.github.com> AuthorDate: Mon Aug 27 09:37:45 2018 -0700 GEODE-5631: failedBatchRemovalMessageKeys not used after GII (#2375) * After GII a flag is set to indicate that failedBatchRemovalMessageKeys has been processed * If this flag is set, no more entries will be put into failedBatchRemovalMessageKeys. --- .../internal/cache/AbstractBucketRegionQueue.java | 12 +++++++++++ .../geode/internal/cache/BucketRegionQueue.java | 1 + .../wan/parallel/ParallelQueueRemovalMessage.java | 10 ++++++--- .../ParallelQueueRemovalMessageJUnitTest.java | 24 ++++++++++++++++++++++ 4 files changed, 44 insertions(+), 3 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java index fe95659..fc06ceb 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java @@ -501,6 +501,18 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion { failedBatchRemovalMessageKeys.add(key); } + public boolean isFailedBatchRemovalMessageKeysClearedFlag() { + return failedBatchRemovalMessageKeysClearedFlag; + } + + public void setFailedBatchRemovalMessageKeysClearedFlag( + boolean failedBatchRemovalMessageKeysClearedFlag) { + this.failedBatchRemovalMessageKeysClearedFlag = failedBatchRemovalMessageKeysClearedFlag; + } + + private boolean failedBatchRemovalMessageKeysClearedFlag = false; + + public ConcurrentHashSet<Object> getFailedBatchRemovalMessageKeys() { return this.failedBatchRemovalMessageKeys; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java index 314d0cc..712c0b8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java @@ -188,6 +188,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue { } } } + setFailedBatchRemovalMessageKeysClearedFlag(true); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java index 6d47266..401094d 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java @@ -187,7 +187,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { } } - private void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key, + void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key, PartitionedRegion prQ) { final boolean isDebugEnabled = logger.isDebugEnabled(); try { @@ -207,8 +207,12 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage { } // add the key to failedBatchRemovalMessageQueue. // This is to handle the last scenario in #49196 - brq.addToFailedBatchRemovalMessageKeys(key); - + // But if GII is already completed and FailedBatchRemovalMessageKeys + // are already cleared then no keys should be added to it as they will + // never be cleared and increase the memory footprint. + if (!brq.isFailedBatchRemovalMessageKeysClearedFlag()) { + brq.addToFailedBatchRemovalMessageKeys(key); + } } catch (ForceReattemptException fe) { if (isDebugEnabled) { logger.debug( diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java index 7633044..6a5b495 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java @@ -17,10 +17,14 @@ package org.apache.geode.internal.cache.wan.parallel; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; @@ -37,6 +41,7 @@ import org.mockito.stubbing.Answer; import org.apache.geode.cache.AttributesFactory; import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.EntryNotFoundException; import org.apache.geode.cache.EvictionAction; import org.apache.geode.cache.EvictionAttributes; import org.apache.geode.cache.Operation; @@ -46,11 +51,13 @@ import org.apache.geode.cache.RegionAttributes; import org.apache.geode.cache.Scope; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.internal.cache.AbstractBucketRegionQueue; import org.apache.geode.internal.cache.BucketAdvisor; import org.apache.geode.internal.cache.BucketRegionQueue; import org.apache.geode.internal.cache.BucketRegionQueueHelper; import org.apache.geode.internal.cache.EntryEventImpl; import org.apache.geode.internal.cache.EvictionAttributesImpl; +import org.apache.geode.internal.cache.ForceReattemptException; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalRegionArguments; import org.apache.geode.internal.cache.KeyInfo; @@ -180,6 +187,23 @@ public class ParallelQueueRemovalMessageJUnitTest { } @Test + public void ifIsFailedBatchRemovalMessageKeysClearedFlagSetThenAddToFailedBatchRemovalMessageKeysNotCalled() + throws ForceReattemptException { + ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(); + Object object = new Object(); + PartitionedRegion partitionedRegion = mock(PartitionedRegion.class); + AbstractBucketRegionQueue brq = mock(AbstractBucketRegionQueue.class); + doThrow(new EntryNotFoundException("ENTRY NOT FOUND")).when(brq).destroyKey(object); + when(brq.isFailedBatchRemovalMessageKeysClearedFlag()).thenReturn(true); + doNothing().when(brq).addToFailedBatchRemovalMessageKeys(object); + pqrm.destroyKeyFromBucketQueue(brq, object, partitionedRegion); + verify(brq, times(1)).destroyKey(object); + verify(brq, times(1)).isFailedBatchRemovalMessageKeysClearedFlag(); + verify(brq, times(0)).addToFailedBatchRemovalMessageKeys(object); + + } + + @Test public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue() throws Exception { // Validate initial BucketRegionQueue state