GEODE-3049: Add warning on loss of all buckets in region * Add statistics for count of buckets that have no copies remaining * Refactor logic for keeping track of redundancy statistics into separate classes * Remove dependency of tracking on the PartitionRegionStatistics (since this can be turned off), but continue recording statistics to that location
This closes #577 Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b7db727a Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b7db727a Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b7db727a Branch: refs/heads/feature/GEM-1483 Commit: b7db727ad8477806e7884d16624cd72bd6ba26e0 Parents: 0131292 Author: Nick Reich <nre...@pivotal.io> Authored: Mon Jun 12 17:29:16 2017 -0700 Committer: Darrel Schneider <dschnei...@pivotal.io> Committed: Mon Jul 24 17:52:43 2017 -0700 ---------------------------------------------------------------------- .../geode/internal/cache/BucketAdvisor.java | 118 ++-------------- .../internal/cache/BucketRedundancyTracker.java | 122 ++++++++++++++++ .../geode/internal/cache/PartitionedRegion.java | 8 ++ .../PartitionedRegionRedundancyTracker.java | 139 +++++++++++++++++++ .../internal/cache/PartitionedRegionStats.java | 13 ++ .../cache/partitioned/RegionAdvisor.java | 18 --- .../cache/BucketRedundancyTrackerTest.java | 124 +++++++++++++++++ .../PartitionedRegionRedundancyTrackerTest.java | 120 ++++++++++++++++ 8 files changed, 537 insertions(+), 125 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java index 0c58963..e8fabb7 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketAdvisor.java @@ -89,6 +89,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor { */ protected final RegionAdvisor regionAdvisor; + private final BucketRedundancyTracker redundancyTracker; + /** * The bucket primary will be holding this distributed lock. Protected by synchronized(this). */ @@ -128,25 +130,6 @@ public class BucketAdvisor extends CacheDistributionAdvisor { static private final Random myRand = new Random(); /** - * Used by {@link #updateRedundancy()} to determine if stat change is required. Access and - * mutation are done while synchronized on this advisor. - * - * @guarded.By this - */ - private boolean redundancySatisfied = true; - - /** - * Used by {@link #incLowRedundancyBucketCount(int)} to determine if redundancy for this bucket - * has ever been satisfied. Only buckets which lose redundancy after having redundancy will - * generate a redundancy loss alert. - * <p> - * Access and mutation are done while synchronized on this advisor. - * - * @guarded.By this - */ - private boolean redundancyEverSatisfied = false; - - /** * A read/write lock to prevent making this bucket not primary while a write is in progress on the * bucket. */ @@ -160,8 +143,6 @@ public class BucketAdvisor extends CacheDistributionAdvisor { */ private BucketAdvisor parentAdvisor; - private volatile int redundancy = -1; - /** * The member that is responsible for choosing the primary for this bucket. While this field is * set and this member exists, this bucket won't try to become primary. @@ -188,6 +169,8 @@ public class BucketAdvisor extends CacheDistributionAdvisor { super(bucket); this.regionAdvisor = regionAdvisor; this.pRegion = this.regionAdvisor.getPartitionedRegion(); + this.redundancyTracker = + new BucketRedundancyTracker(pRegion.getRedundantCopies(), pRegion.getRedundancyTracker()); resetParentAdvisor(bucket.getId()); } @@ -365,64 +348,6 @@ public class BucketAdvisor extends CacheDistributionAdvisor { } /** - * Increment or decrement lowRedundancyBucketCount stat and generate alert only once per loss of - * redundancy for PR but only if redundancy has ever been satisfied. - * <p> - * Caller must synchronize on this BucketAdvisor. - * - * @param val the value to increment or decrement by - * @guarded.By this - */ - private void incLowRedundancyBucketCount(int val) { - final int HAS_LOW_REDUNDANCY = 0; - final int ALREADY_GENERATED_WARNING = 1; - - final PartitionedRegionStats stats = getPartitionedRegionStats(); - final boolean[] lowRedundancyFlags = this.regionAdvisor.getLowRedundancyFlags(); - final int configuredRedundancy = this.pRegion.getRedundantCopies(); - - synchronized (lowRedundancyFlags) { - stats.incLowRedundancyBucketCount(val); - - if (stats.getLowRedundancyBucketCount() == 0) { - // all buckets are fully redundant - lowRedundancyFlags[HAS_LOW_REDUNDANCY] = false; // reset - lowRedundancyFlags[ALREADY_GENERATED_WARNING] = false; // reset - stats.setActualRedundantCopies(configuredRedundancy); - } - - else { - // one or more buckets are not fully redundant - int numBucketHosts = getBucketRedundancy() + 1; - int actualRedundancy = Math.max(numBucketHosts - 1, 0); // zero or more - - if (actualRedundancy < stats.getActualRedundantCopies()) { - // need to generate an alert for this lower actual redundancy - lowRedundancyFlags[ALREADY_GENERATED_WARNING] = false; - } - - if (!lowRedundancyFlags[HAS_LOW_REDUNDANCY] - || !lowRedundancyFlags[ALREADY_GENERATED_WARNING]) { - // either we have lower redundancy or we never generated an alert - - lowRedundancyFlags[HAS_LOW_REDUNDANCY] = true; - stats.setActualRedundantCopies(actualRedundancy); - - // this bucket will only generate alert if redundancyEverSatisfied - if (!lowRedundancyFlags[ALREADY_GENERATED_WARNING] && this.redundancyEverSatisfied) { - - lowRedundancyFlags[ALREADY_GENERATED_WARNING] = true; - logger.warn(LocalizedMessage.create( - LocalizedStrings.BucketAdvisor_REDUNDANCY_HAS_DROPPED_BELOW_0_CONFIGURED_COPIES_TO_1_ACTUAL_COPIES_FOR_2, - new Object[] {Integer.valueOf(configuredRedundancy), - Integer.valueOf(actualRedundancy), this.pRegion.getFullPath()})); - } - } - } - } - } - - /** * Return (and possibly choose) a thread-sticky member from whose data store this bucket's values * should be read * @@ -1007,10 +932,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { wasPrimary = isPrimary(); super.close(); this.requestPrimaryState(CLOSED); - if (!this.redundancySatisfied) { - incLowRedundancyBucketCount(-1); - this.redundancySatisfied = true; - } + this.redundancyTracker.closeBucket(); this.localProfile = null; } if (wasPrimary) { @@ -1797,32 +1719,14 @@ public class BucketAdvisor extends CacheDistributionAdvisor { } /** - * Determine if there has been a change in redundancy and alter the lowRedundancyBucketCount stat - * as needed. - * - * Also updates a counter used to track the redundancy of this member - * - * @return current number of hosts for this bucket - * @see #redundancySatisfied - * @see PartitionedRegionStats#incLowRedundancyBucketCount(int) - * @guarded.By this + * Get the current number of bucket hosts and update the redundancy statistics for the region + * + * @return number of current bucket hosts */ private int updateRedundancy() { - int desiredRedundancy = this.pRegion.getRedundantCopies(); int numBucketHosts = getNumInitializedBuckets(); - if (isClosed()) { - return numBucketHosts; - } - int actualRedundancy = numBucketHosts - 1; - this.redundancy = actualRedundancy; - if (this.redundancySatisfied && numBucketHosts > 0 && actualRedundancy < desiredRedundancy) { - incLowRedundancyBucketCount(1); - this.redundancySatisfied = false; - } else if (!this.redundancySatisfied && numBucketHosts > 0 - && actualRedundancy >= desiredRedundancy) { - incLowRedundancyBucketCount(-1); - this.redundancySatisfied = true; - this.redundancyEverSatisfied = true; + if (!isClosed()) { + redundancyTracker.updateStatistics(numBucketHosts); } return numBucketHosts; } @@ -1882,7 +1786,7 @@ public class BucketAdvisor extends CacheDistributionAdvisor { * @return current number of hosts of this bucket ; -1 if there are no hosts */ public int getBucketRedundancy() { - return redundancy; + return redundancyTracker.getCurrentRedundancy(); } public Set<InternalDistributedMember> adviseInitialized() { http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java new file mode 100644 index 0000000..2883744 --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRedundancyTracker.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +/** + * Keeps track of redundancy status for a bucket in a PartitionedRegion and update the region's + * {@link PartitionedRegionRedundancyTracker} of the bucket's status for the region. + */ +class BucketRedundancyTracker { + private boolean redundancySatisfied = false; + private boolean hasAnyCopies = false; + private boolean redundancyEverSatisfied = false; + private boolean hasEverHadCopies = false; + private volatile int currentRedundancy = -1; + private final int targetRedundancy; + private final PartitionedRegionRedundancyTracker regionRedundancyTracker; + + /** + * Creates a new BucketRedundancyTracker + * + * @param redundantCopies the number of redundant copies specified for the + * {@link PartitionedRegion} of this bucket + * @param regionRedundancyTracker the redundancy tracker for the {@link PartitionedRegion} of this + * bucket + */ + BucketRedundancyTracker(int redundantCopies, + PartitionedRegionRedundancyTracker regionRedundancyTracker) { + this.targetRedundancy = redundantCopies; + this.regionRedundancyTracker = regionRedundancyTracker; + } + + /** + * Adjust statistics based on closing a bucket + */ + synchronized void closeBucket() { + if (!redundancySatisfied) { + regionRedundancyTracker.decrementLowRedundancyBucketCount(); + redundancySatisfied = true; + } + if (hasEverHadCopies && !hasAnyCopies) { + regionRedundancyTracker.decrementNoCopiesBucketCount(); + hasAnyCopies = true; + } + } + + /** + * Determines if there has been a change in current redundancy and updates statistics on + * redundancy for the region of the bucket for this tracker + * + * @param currentBucketHosts number of current hosts for the bucket + */ + synchronized void updateStatistics(int currentBucketHosts) { + updateRedundancyStatistics(currentBucketHosts); + updateNoCopiesStatistics(currentBucketHosts); + } + + /** + * Provides the current redundancy of the bucket for this tracker + * + * @return number of redundant copies of the bucket for this tracker + */ + int getCurrentRedundancy() { + return currentRedundancy; + } + + private void updateNoCopiesStatistics(int currentBucketHosts) { + if (hasAnyCopies && currentBucketHosts == 0) { + hasAnyCopies = false; + regionRedundancyTracker.incrementNoCopiesBucketCount(); + } else if (!hasAnyCopies && currentBucketHosts > 0) { + if (hasEverHadCopies) { + regionRedundancyTracker.decrementNoCopiesBucketCount(); + } + hasEverHadCopies = true; + hasAnyCopies = true; + } + } + + private void updateRedundancyStatistics(int updatedBucketHosts) { + int updatedRedundancy = updatedBucketHosts - 1; + updateCurrentRedundancy(updatedRedundancy); + + if (updatedRedundancy < targetRedundancy) { + reportUpdatedBucketCount(updatedBucketHosts); + if (redundancySatisfied) { + regionRedundancyTracker.incrementLowRedundancyBucketCount(); + redundancySatisfied = false; + } else if (!hasAnyCopies && updatedRedundancy >= 0) { + regionRedundancyTracker.incrementLowRedundancyBucketCount(); + } + } else if (!redundancySatisfied && updatedRedundancy == targetRedundancy) { + regionRedundancyTracker.decrementLowRedundancyBucketCount(); + redundancySatisfied = true; + redundancyEverSatisfied = true; + } + } + + private void updateCurrentRedundancy(int updatedRedundancy) { + if (updatedRedundancy != currentRedundancy) { + regionRedundancyTracker.setActualRedundancy(updatedRedundancy); + currentRedundancy = updatedRedundancy; + } + } + + private void reportUpdatedBucketCount(int updatedBucketHosts) { + if (redundancyEverSatisfied) { + regionRedundancyTracker.reportBucketCount(updatedBucketHosts); + } + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java index 27b442d..8410c00 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java @@ -569,6 +569,10 @@ public class PartitionedRegion extends LocalRegion } }; + PartitionedRegionRedundancyTracker getRedundancyTracker() { + return redundancyTracker; + } + public static class PRIdMap extends HashMap { private static final long serialVersionUID = 3667357372967498179L; @@ -713,6 +717,8 @@ public class PartitionedRegion extends LocalRegion private AbstractGatewaySender parallelGatewaySender = null; + private final PartitionedRegionRedundancyTracker redundancyTracker; + /** * Constructor for a PartitionedRegion. This has an accessor (Region API) functionality and * contains a datastore for actual storage. An accessor can act as a local cache by having a local @@ -754,6 +760,8 @@ public class PartitionedRegion extends LocalRegion // getScope is overridden to return the correct scope. // this.scope = Scope.LOCAL; this.redundantCopies = regionAttributes.getPartitionAttributes().getRedundantCopies(); + this.redundancyTracker = new PartitionedRegionRedundancyTracker(this.totalNumberOfBuckets, + this.redundantCopies, this.prStats, getFullPath()); this.prStats.setConfiguredRedundantCopies( regionAttributes.getPartitionAttributes().getRedundantCopies()); this.prStats.setLocalMaxMemory( http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTracker.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTracker.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTracker.java new file mode 100644 index 0000000..38ef61b --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTracker.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import org.apache.geode.internal.i18n.LocalizedStrings; +import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.logging.log4j.LocalizedMessage; +import org.apache.logging.log4j.Logger; + +/** + * Keeps track redundancy statistics across the buckets of a given {@link PartitionedRegion} + */ +class PartitionedRegionRedundancyTracker { + private static final Logger logger = LogService.getLogger(); + + private final PartitionedRegionStats stats; + private final String regionPath; + private final int totalBuckets; + private final int targetRedundancy; + + private int lowRedundancyBuckets; + private int noCopiesBuckets; + private int lowestBucketCopies; + + /** + * Creates a new PartitionedRegionRedundancyTracker + * + * @param totalBuckets number of buckets in the region to track + * @param redundantCopies number of redundant copies specified on the region to track + * @param stats the statistics container for the region to track + * @param regionPath the full path of the region to track + */ + PartitionedRegionRedundancyTracker(int totalBuckets, int redundantCopies, + PartitionedRegionStats stats, String regionPath) { + this.stats = stats; + this.regionPath = regionPath; + this.totalBuckets = totalBuckets; + this.targetRedundancy = redundantCopies; + this.lowestBucketCopies = redundantCopies + 1; + } + + /** + * Since consistency was last reached, provides the lowest number of copies of a bucket that have + * been remaining across all the buckets in the region + * + * @return the number of copies of the bucket with the least copies available + */ + int getLowestBucketCopies() { + return lowestBucketCopies; + } + + /** + * Increments the count of buckets that do not meet redundancy + */ + synchronized void incrementLowRedundancyBucketCount() { + if (lowRedundancyBuckets == totalBuckets) { + return; + } + lowRedundancyBuckets++; + stats.incLowRedundancyBucketCount(1); + } + + /** + * Updates the count of copies for the bucket with the least copies if a new low has been reached + * + * @param bucketCopies number of copies of a bucket remaining + */ + synchronized void reportBucketCount(int bucketCopies) { + if (bucketCopies < lowestBucketCopies) { + lowestBucketCopies = bucketCopies; + logger.warn(LocalizedMessage.create( + LocalizedStrings.BucketAdvisor_REDUNDANCY_HAS_DROPPED_BELOW_0_CONFIGURED_COPIES_TO_1_ACTUAL_COPIES_FOR_2, + new Object[] {targetRedundancy + 1, bucketCopies, regionPath})); + } + } + + /** + * Increments the count of buckets that no longer have any copies remaining + */ + synchronized void incrementNoCopiesBucketCount() { + if (noCopiesBuckets == totalBuckets) { + return; + } + noCopiesBuckets++; + stats.incNoCopiesBucketCount(1); + if (noCopiesBuckets == 1) { + logger.warn("All in memory copies of some data have been lost for " + regionPath); + } + } + + /** + * Decrements the count of buckets that do not meet redundancy + */ + synchronized void decrementLowRedundancyBucketCount() { + if (lowRedundancyBuckets == 0) { + return; + } + lowRedundancyBuckets--; + stats.incLowRedundancyBucketCount(-1); + if (lowRedundancyBuckets == 0) { + lowestBucketCopies = targetRedundancy + 1; + logger.info("Configured redundancy of " + (targetRedundancy + 1) + + " copies has been restored to " + regionPath); + } + } + + /** + * Decrements the count of buckets that no longer have any copies remaining + */ + synchronized void decrementNoCopiesBucketCount() { + if (noCopiesBuckets == 0) { + return; + } + noCopiesBuckets--; + stats.incNoCopiesBucketCount(-1); + // if the last bucket with no copies has gained a copy, the bucket with the lowest + // number of copies (that bucket) has one copy + if (noCopiesBuckets == 0) { + lowestBucketCopies = 1; + } + } + + void setActualRedundancy(int actualRedundancy) { + stats.setActualRedundantCopies(actualRedundancy); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java index d1dc1be..535e877 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionStats.java @@ -138,6 +138,8 @@ public class PartitionedRegionStats { private final static int volunteeringThreadsId; // number of threads actively volunteering private final static int lowRedundancyBucketCountId; // number of buckets currently without full // redundancy + private final static int noCopiesBucketCountId; // number of buckets currently without any + // redundancy private final static int configuredRedundantCopiesId; private final static int actualRedundantCopiesId; @@ -308,6 +310,8 @@ public class PartitionedRegionStats { "Current number of threads volunteering for primary.", "threads"), f.createIntGauge("lowRedundancyBucketCount", "Current number of buckets without full redundancy.", "buckets"), + f.createIntGauge("noCopiesBucketCount", + "Current number of buckets without any copies remaining.", "buckets"), f.createIntGauge("configuredRedundantCopies", "Configured number of redundant copies for this partitioned region.", "copies"), f.createIntGauge("actualRedundantCopies", @@ -477,6 +481,7 @@ public class PartitionedRegionStats { primaryBucketCountId = type.nameToId("primaryBucketCount"); volunteeringThreadsId = type.nameToId("volunteeringThreads"); lowRedundancyBucketCountId = type.nameToId("lowRedundancyBucketCount"); + noCopiesBucketCountId = type.nameToId("noCopiesBucketCount"); getEntriesCompletedId = type.nameToId("getEntryCompleted"); getEntryTimeId = type.nameToId("getEntryTime"); @@ -899,10 +904,18 @@ public class PartitionedRegionStats { return this.stats.getInt(lowRedundancyBucketCountId); } + public int getNoCopiesBucketCount() { + return this.stats.getInt(noCopiesBucketCountId); + } + public void incLowRedundancyBucketCount(int val) { this.stats.incInt(lowRedundancyBucketCountId, val); } + public void incNoCopiesBucketCount(int val) { + this.stats.incInt(noCopiesBucketCountId, val); + } + public int getConfiguredRedundantCopies() { return this.stats.getInt(configuredRedundantCopiesId); } http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java index c473d2d..63a694a 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RegionAdvisor.java @@ -80,26 +80,8 @@ public class RegionAdvisor extends CacheDistributionAdvisor { private Queue preInitQueue; private final Object preInitQueueMonitor = new Object(); - /** - * Used by to generate redundancy loss alert only once even if more than one bucket or PR has lost - * redundancy. lowRedundancyFlags[0] is true if any bucket in this partitioned region has lower - * than configured redundancy. lowRedundancyFlags[1] is true if a warning has been generated for - * the current actual redundancy of this partitioned region. The caller must synchronize on - * lowRedundancyFlags in order to maintain atomicity of overall redundancy status and alert. - */ - private final boolean[] lowRedundancyFlags = new boolean[2]; - private ConcurrentHashMap<Integer, Set<ServerBucketProfile>> clientBucketProfilesMap; - /** - * Caller must synchronize on the return value. - * - * @return the low redundancy flags for this partitioned region - */ - public boolean[] getLowRedundancyFlags() { - return lowRedundancyFlags; - } - private RegionAdvisor(PartitionedRegion region) { super(region); synchronized (this.preInitQueueMonitor) { http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java new file mode 100644 index 0000000..080cec1 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRedundancyTrackerTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(UnitTest.class) +public class BucketRedundancyTrackerTest { + private static final int TARGET_COPIES = 2; + + private PartitionedRegionRedundancyTracker regionRedundancyTracker; + private BucketRedundancyTracker bucketRedundancyTracker; + + @Before + public void setup() { + regionRedundancyTracker = mock(PartitionedRegionRedundancyTracker.class); + bucketRedundancyTracker = + new BucketRedundancyTracker(TARGET_COPIES - 1, regionRedundancyTracker); + } + + @Test + public void whenRedundancyNeverMetDoesNotWarnOnLowRedundancy() { + bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1); + verify(regionRedundancyTracker, never()).reportBucketCount(anyInt()); + } + + @Test + public void incrementsBucketCountOnLowRedundancyForBucket() { + bucketRedundancyTracker.updateStatistics(TARGET_COPIES); + bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1); + verify(regionRedundancyTracker, times(1)).incrementLowRedundancyBucketCount(); + assertEquals(0, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test + public void decrementsBucketCountOnRegainingRedundancyForBucket() { + bucketRedundancyTracker.updateStatistics(TARGET_COPIES); + bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1); + bucketRedundancyTracker.updateStatistics(TARGET_COPIES); + verify(regionRedundancyTracker, times(2)).decrementLowRedundancyBucketCount(); + assertEquals(TARGET_COPIES - 1, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test + public void decrementsBucketCountOnClosingBucketBelowRedundancy() { + bucketRedundancyTracker.updateStatistics(TARGET_COPIES); + bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1); + bucketRedundancyTracker.closeBucket(); + verify(regionRedundancyTracker, times(2)).decrementLowRedundancyBucketCount(); + assertEquals(0, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test + public void decrementsBucketCountOnClosingABucketWithNoCopies() { + bucketRedundancyTracker.updateStatistics(TARGET_COPIES); + bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1); + bucketRedundancyTracker.updateStatistics(0); + bucketRedundancyTracker.closeBucket(); + verify(regionRedundancyTracker, times(2)).decrementLowRedundancyBucketCount(); + assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test + public void bucketCountNotDecrementedOnClosingBucketThatNeverHadCopies() { + verify(regionRedundancyTracker, never()).decrementLowRedundancyBucketCount(); + assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test + public void doesNotWarnWhenNeverHadAnyCopies() { + bucketRedundancyTracker.updateStatistics(0); + verify(regionRedundancyTracker, never()).reportBucketCount(anyInt()); + assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test + public void incrementsBucketCountOnHavingNoCopiesForBucket() { + bucketRedundancyTracker.updateStatistics(1); + bucketRedundancyTracker.updateStatistics(0); + verify(regionRedundancyTracker, times(1)).incrementNoCopiesBucketCount(); + assertEquals(-1, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test + public void decrementsBucketCountOnHavingAtLeastOneCopyOfBucket() { + bucketRedundancyTracker.updateStatistics(1); + bucketRedundancyTracker.updateStatistics(0); + bucketRedundancyTracker.updateStatistics(1); + verify(regionRedundancyTracker, times(1)).decrementNoCopiesBucketCount(); + assertEquals(0, bucketRedundancyTracker.getCurrentRedundancy()); + } + + @Test + public void updatesRedundancyOnlyIfChanged() { + bucketRedundancyTracker.updateStatistics(TARGET_COPIES - 1); + verify(regionRedundancyTracker, times(1)).setActualRedundancy(TARGET_COPIES - 2); + bucketRedundancyTracker.updateStatistics(TARGET_COPIES); + verify(regionRedundancyTracker, times(1)).setActualRedundancy(TARGET_COPIES - 1); + bucketRedundancyTracker.updateStatistics(TARGET_COPIES); + verify(regionRedundancyTracker, times(2)).setActualRedundancy(anyInt()); + } +} http://git-wip-us.apache.org/repos/asf/geode/blob/b7db727a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTrackerTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTrackerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTrackerTest.java new file mode 100644 index 0000000..0917835 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionRedundancyTrackerTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.geode.internal.cache; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.verifyZeroInteractions; + +import org.apache.geode.test.junit.categories.UnitTest; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(UnitTest.class) +public class PartitionedRegionRedundancyTrackerTest { + private static final int TARGET_COPIES = 2; + private static final int TOTAL_BUCKETS = 3; + + private PartitionedRegionStats stats; + private PartitionedRegionRedundancyTracker redundancyTracker; + + @Before + public void setup() { + stats = mock(PartitionedRegionStats.class); + redundancyTracker = new PartitionedRegionRedundancyTracker(TOTAL_BUCKETS, TARGET_COPIES - 1, + stats, "testRegion"); + } + + @Test + public void incrementsAndDecrementsLowRedundancyBucketCount() { + redundancyTracker.incrementLowRedundancyBucketCount(); + verify(stats, times(1)).incLowRedundancyBucketCount(1); + redundancyTracker.decrementLowRedundancyBucketCount(); + verify(stats, times(1)).incLowRedundancyBucketCount(-1); + } + + @Test + public void willNotIncrementLowRedundancyBucketCountBeyondTotalsBuckets() { + + for (int i = 0; i < TOTAL_BUCKETS; i++) { + redundancyTracker.incrementLowRedundancyBucketCount(); + } + verify(stats, times(TOTAL_BUCKETS)).incLowRedundancyBucketCount(1); + redundancyTracker.incrementLowRedundancyBucketCount(); + verifyNoMoreInteractions(stats); + } + + @Test + public void willNotDecrementLowRedundancyBucketCountBelowZero() { + redundancyTracker.decrementLowRedundancyBucketCount(); + verifyZeroInteractions(stats); + } + + @Test + public void incrementsAndDecrementsNoCopiesBucketCount() { + redundancyTracker.incrementNoCopiesBucketCount(); + verify(stats, times(1)).incNoCopiesBucketCount(1); + redundancyTracker.decrementNoCopiesBucketCount(); + verify(stats, times(1)).incNoCopiesBucketCount(-1); + } + + @Test + public void willNotIncrementNoCopiesBucketCountBeyondTotalsBuckets() { + + for (int i = 0; i < TOTAL_BUCKETS; i++) { + redundancyTracker.incrementNoCopiesBucketCount(); + } + verify(stats, times(TOTAL_BUCKETS)).incNoCopiesBucketCount(1); + redundancyTracker.incrementNoCopiesBucketCount(); + verifyNoMoreInteractions(stats); + } + + @Test + public void willNotDecrementNoCopiesBucketCountBelowZero() { + redundancyTracker.decrementNoCopiesBucketCount(); + verify(stats, times(0)).incNoCopiesBucketCount(-1); + } + + @Test + public void reportsCorrectLowestBucketCopies() { + redundancyTracker.reportBucketCount(1); + assertEquals(1, redundancyTracker.getLowestBucketCopies()); + redundancyTracker.reportBucketCount(0); + assertEquals(0, redundancyTracker.getLowestBucketCopies()); + redundancyTracker.reportBucketCount(1); + assertEquals(0, redundancyTracker.getLowestBucketCopies()); + } + + @Test + public void lowestBucketCopiesResetsOnRedundancyRegained() { + redundancyTracker.incrementLowRedundancyBucketCount(); + redundancyTracker.reportBucketCount(1); + redundancyTracker.decrementLowRedundancyBucketCount(); + assertEquals(2, redundancyTracker.getLowestBucketCopies()); + } + + @Test + public void lowestBucketCopiesSetToOneOnHavingABucketAgain() { + redundancyTracker.incrementNoCopiesBucketCount(); + redundancyTracker.reportBucketCount(0); + redundancyTracker.decrementNoCopiesBucketCount(); + assertEquals(1, redundancyTracker.getLowestBucketCopies()); + } +}