This is an automated email from the ASF dual-hosted git repository. mhanson pushed a commit to branch support/1.14 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.14 by this push: new 142e06f GEODE-9554: Change up the rebalance calls to use new canDelete call (#6845) (#6853) 142e06f is described below commit 142e06fe001aae428c32ec602992e45bcefa2259 Author: mhansonp <hans...@vmware.com> AuthorDate: Thu Sep 9 17:31:41 2021 -0700 GEODE-9554: Change up the rebalance calls to use new canDelete call (#6845) (#6853) GEODE-9554: Change up the rebalance calls to use new canDelete call If there is no redundancy zone, exit and allow delete Adding xml files to git Adding xml files to git Added new tests (cherry picked from commit d1d605b24787698c5d8f47b8538808d6b990c0a4) --- .../RebalanceOperationComplexDistributedTest.java | 272 +++++++++++++++++++++ .../cache/RebalanceOperationComplex-client.xml | 35 +++ .../cache/RebalanceOperationComplex-server.xml | 48 ++++ .../internal/cache/PartitionedRegionDataStore.java | 9 - .../control/PartitionRebalanceDetailsImpl.java | 21 ++ .../partitioned/PartitionedRegionRebalanceOp.java | 5 +- .../cache/partitioned/rebalance/model/Member.java | 61 ++++- .../model/PartitionedRegionLoadModel.java | 46 +++- .../partitioned/rebalance/model/RefusalReason.java | 13 +- .../cache/partitioned/rebalance/MemberTest.java | 107 ++++++++ .../PartitionedRegionLoadModelJUnitTest.java | 67 ++++- 11 files changed, 652 insertions(+), 32 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexDistributedTest.java new file mode 100644 index 0000000..eb6d536 --- /dev/null +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexDistributedTest.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.control; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE; +import static org.apache.geode.internal.lang.SystemPropertyHelper.DEFAULT_DISK_DIRS_PROPERTY; +import static org.apache.geode.internal.lang.SystemPropertyHelper.GEODE_PREFIX; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.Serializable; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import org.apache.commons.io.FileUtils; +import org.apache.logging.log4j.Logger; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.client.ClientCache; +import org.apache.geode.cache.control.ResourceManager; +import org.apache.geode.internal.cache.PartitionedRegion; +import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.test.awaitility.GeodeAwaitility; +import org.apache.geode.test.dunit.VM; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +/** + * The purpose of RebalanceOperationComplexDistributedTest is to test rebalances + * across zones and to ensure that enforceUniqueZone behavior of redundancy zones + * is working correctly. + */ +@RunWith(JUnitParamsRunner.class) +public class RebalanceOperationComplexDistributedTest implements Serializable { + public static final int EXPECTED_BUCKET_COUNT = 113; + public static final long TIMEOUT_SECONDS = GeodeAwaitility.getTimeout().getSeconds(); + public static final String CLIENT_XML = "RebalanceOperationComplex-client.xml"; + public static final String SERVER_XML = "RebalanceOperationComplex-server.xml"; + public static final String REGION_NAME = "primary"; + public static final String COLOCATED_REGION_NAME = "colocated"; + public static final Logger logger = LogService.getLogger(); + + public static final String ZONE_A = "zoneA"; + public static final String ZONE_B = "zoneB"; + public int locatorPort; + public static final AtomicInteger runID = new AtomicInteger(0); + public String workingDir; + + // 6 servers distributed evenly across 2 zones + public static final Map<Integer, String> SERVER_ZONE_MAP = new HashMap<Integer, String>() { + { + put(1, ZONE_A); + put(2, ZONE_A); + put(3, ZONE_A); + put(4, ZONE_B); + put(5, ZONE_B); + put(6, ZONE_B); + } + }; + + @Rule + public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(8); + + @Before + public void setup() { + // Start the locator + MemberVM locatorVM = clusterStartupRule.startLocatorVM(0); + locatorPort = locatorVM.getPort(); + + workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath(); + + runID.incrementAndGet(); + cleanOutServerDirectories(); + } + + @After + public void after() { + stopServersAndDeleteDirectories(); + } + + /** + * Test that we correctly use the redundancy-zone property to determine where to place redundant + * copies of a buckets and doesn't allow cross redundancy zone deletes. + * + * @param rebalanceServer - the server index that will initiate all the rebalances + * @param serverToBeShutdownAndRestarted - the server index that will be shutdown and restarted + */ + @Test + @Parameters({"1,2", "1,4", "4,1", "5,6"}) + public void testEnforceZoneWithSixServersAndTwoZones(int rebalanceServer, + int serverToBeShutdownAndRestarted) + throws Exception { + + // Startup the servers + for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) { + startServerInRedundancyZone(entry.getKey(), entry.getValue()); + } + + // Put data in the server regions + clientPopulateServers(); + + // Rebalance Server VM will initiate all of the rebalances in this test + VM rebalanceServerVM = clusterStartupRule.getVM(rebalanceServer); + + // Baseline rebalance with everything up + rebalanceServerVM.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager())); + + // Take the serverToBeShutdownAndRestarted offline + clusterStartupRule.stop(serverToBeShutdownAndRestarted, false); + + // Rebalance so that now all the buckets are on the other two servers. + // Zone b servers should not be touched. + rebalanceServerVM.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager())); + + // Restart the serverToBeShutdownAndRestarted + startServerInRedundancyZone(serverToBeShutdownAndRestarted, + SERVER_ZONE_MAP.get(serverToBeShutdownAndRestarted)); + + // Do another rebalance to make sure all the buckets are distributed evenly(ish) and there are + // no cross redundancy zone bucket deletions. + rebalanceServerVM.invoke(() -> doRebalance(ClusterStartupRule.getCache().getResourceManager())); + + // Verify that all bucket counts add up to what they should + compareZoneBucketCounts(COLOCATED_REGION_NAME); + compareZoneBucketCounts(REGION_NAME); + } + + private void stopServersAndDeleteDirectories() { + for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) { + clusterStartupRule.stop(entry.getKey(), true); + } + cleanOutServerDirectories(); + } + + private void cleanOutServerDirectories() { + for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) { + int index = entry.getKey(); + VM.getVM(index).invoke(() -> { + String path = workingDir + "/" + "runId-" + runID.get() + "-vm-" + index; + File temporaryDirectory = new File(path); + if (temporaryDirectory.exists()) { + try { + Arrays.stream(temporaryDirectory.listFiles()).forEach(FileUtils::deleteQuietly); + Files.delete(temporaryDirectory.toPath()); + } catch (Exception exception) { + logger.error("The delete of files or directory failed ", exception); + throw exception; + } + } + }); + } + } + + /** + * Startup a client to put all the data in the server regions + */ + private void clientPopulateServers() throws Exception { + Properties properties2 = new Properties(); + properties2.setProperty("cache-xml-file", CLIENT_XML); + ClientVM clientVM = + clusterStartupRule.startClientVM(SERVER_ZONE_MAP.size() + 1, properties2, + ccf -> ccf.addPoolLocator("localhost", locatorPort)); + + clientVM.invoke(() -> { + Map<Integer, String> putMap = new HashMap<>(); + for (int i = 0; i < 1000; i++) { + putMap.put(i, "A"); + } + + ClientCache clientCache = ClusterStartupRule.getClientCache(); + + Stream.of(REGION_NAME, COLOCATED_REGION_NAME).forEach(regionName -> { + Region<Integer, String> region = clientCache.getRegion(regionName); + region.putAll(putMap); + }); + }); + } + + /** + * Startup server *index* in *redundancy zone* + * + * @param index - server + * @param zone - Redundancy zone for the server to be started in + */ + private void startServerInRedundancyZone(int index, final String zone) { + VM.getVM(index).invoke(() -> { + String path = workingDir + "/" + "runId-" + runID.get() + "-vm-" + index; + + File temporaryDirectory = new File(path); + if (!temporaryDirectory.exists()) { + Files.createDirectory(temporaryDirectory.toPath()); + } + System.setProperty(GEODE_PREFIX + DEFAULT_DISK_DIRS_PROPERTY, path); + }); + + clusterStartupRule.startServerVM(index, s -> s.withProperty("cache-xml-file", + SERVER_XML) + .withProperty(REDUNDANCY_ZONE, zone) + .withConnectionToLocator(locatorPort)); + } + + /** + * Trigger a rebalance of buckets + * + */ + private void doRebalance(ResourceManager manager) + throws TimeoutException, InterruptedException { + manager.createRebalanceFactory() + .start().getResults(TIMEOUT_SECONDS, SECONDS); + assertThat(manager.getRebalanceOperations()).isEmpty(); + } + + /** + * Compare the bucket counts for each zone. They should be equal + * + */ + private void compareZoneBucketCounts(final String regionName) { + int zoneABucketCount = getZoneBucketCount(regionName, ZONE_A); + int zoneBBucketCount = getZoneBucketCount(regionName, ZONE_B); + + assertThat(zoneABucketCount).isEqualTo(zoneBBucketCount).isEqualTo(EXPECTED_BUCKET_COUNT); + } + + /** + * Get the bucket count for the region in the redundancy zone + * + * @param regionName - name of the region to get the bucket count of + * @param zoneName - redundancy zone for which to get the bucket count + * @return - the total bucket count for the region in the redundancy zone + */ + private int getZoneBucketCount(String regionName, String zoneName) { + int bucketCount = 0; + for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) { + if (entry.getValue().compareTo(zoneName) == 0) { + bucketCount += + clusterStartupRule.getVM(entry.getKey()).invoke(() -> { + PartitionedRegion region = + (PartitionedRegion) ClusterStartupRule.getCache().getRegion(regionName); + return region.getLocalBucketsListTestOnly().size(); + }); + } + } + return bucketCount; + } +} diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-client.xml b/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-client.xml new file mode 100755 index 0000000..bc33c6e --- /dev/null +++ b/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-client.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<client-cache + xmlns="http://geode.apache.org/schema/cache" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://geode.apache.org/schema/cache + http://geode.apache.org/schema/cache/cache-1.0.xsd" + version="1.0"> + + <pdx read-serialized="false"> + <pdx-serializer> + <class-name>org.apache.geode.pdx.ReflectionBasedAutoSerializer</class-name> + <parameter name="classes"> + <string>.*</string> + </parameter> + </pdx-serializer> + </pdx> + <region name="primary" refid="PROXY"/> + <region name="colocated" refid="PROXY"/> +</client-cache> diff --git a/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-server.xml b/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-server.xml new file mode 100755 index 0000000..d313d90 --- /dev/null +++ b/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-server.xml @@ -0,0 +1,48 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<cache + xmlns="http://geode.apache.org/schema/cache" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://geode.apache.org/schema/cache + http://geode.apache.org/schema/cache/cache-1.0.xsd" + version="1.0"> + + <cache-server port="0"/> + + <pdx read-serialized="false" persistent="true"> + <pdx-serializer> + <class-name>org.apache.geode.pdx.ReflectionBasedAutoSerializer</class-name> + <parameter name="classes"> + <string>.*</string> + </parameter> + </pdx-serializer> + </pdx> + + <region name="primary" refid="PARTITION_REDUNDANT_PERSISTENT"> + <region-attributes> + <partition-attributes total-num-buckets="113" redundant-copies="1" recovery-delay="-1" startup-recovery-delay="-1"/> + </region-attributes> + </region> + + <region name="colocated" refid="PARTITION_REDUNDANT_PERSISTENT"> + <region-attributes> + <partition-attributes colocated-with="primary" total-num-buckets="113" redundant-copies="1" recovery-delay="-1" startup-recovery-delay="-1"/> + </region-attributes> + </region> + +</cache> diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java index 5b17978..f874c76 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java @@ -826,15 +826,6 @@ public class PartitionedRegionDataStore implements HasCachePerfStats { dumpBucket(bucketId, bucketRegion); } - // Iterator i = localRegion.entrySet().iterator(); - // while (i.hasNext()) { - // try { - // NonTXEntry nte = (NonTXEntry) i.next(); - // // updateBucket2Size(bucketId.longValue(), localRegion, null); - // // nte.getRegionEntry().getValueInVM(); - // } catch (EntryDestroyedException ignore) {} - // } - return bucketRegion; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java index 5eeb8aa..aaf4021 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java @@ -48,6 +48,27 @@ public class PartitionRebalanceDetailsImpl private long time; private int numOfMembers; + @Override + public String toString() { + return "PartitionRebalanceDetailsImpl{" + + "bucketCreateBytes=" + bucketCreateBytes + + ", bucketCreateTime=" + bucketCreateTime + + ", bucketCreatesCompleted=" + bucketCreatesCompleted + + ", bucketRemoveBytes=" + bucketRemoveBytes + + ", bucketRemoveTime=" + bucketRemoveTime + + ", bucketRemovesCompleted=" + bucketRemovesCompleted + + ", bucketTransferBytes=" + bucketTransferBytes + + ", bucketTransferTime=" + bucketTransferTime + + ", bucketTransfersCompleted=" + bucketTransfersCompleted + + ", partitionMemberDetailsAfter=" + partitionMemberDetailsAfter + + ", partitionMemberDetailsBefore=" + partitionMemberDetailsBefore + + ", primaryTransferTime=" + primaryTransferTime + + ", primaryTransfersCompleted=" + primaryTransfersCompleted + + ", region=" + region + + ", time=" + time + + '}'; + } + public PartitionRebalanceDetailsImpl(PartitionedRegion region) { this.region = region; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java index 6dd53e7..757e227 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java @@ -290,9 +290,8 @@ public class PartitionedRegionRebalanceOp { // Early out of this VM doesn't have all of the regions that are // supposed to be colocated with this one. // TODO rebalance - there is a race here between this check and the - // earlier acquisition of the list - // of colocated regions. Really we should get a list of all colocated - // regions on all nodes. + // earlier acquisition of the list of colocated regions. Really we + // should get a list of all colocated regions on all nodes. if (!ColocationHelper.checkMembersColocation(leaderRegion, leaderRegion.getDistributionManager().getDistributionManagerId())) { return false; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java index 777caeb..462a219 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java @@ -19,6 +19,9 @@ import java.util.TreeSet; import org.apache.logging.log4j.Logger; +import org.apache.geode.annotations.VisibleForTesting; +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; import org.apache.geode.logging.internal.log4j.api.LogService; @@ -40,7 +43,9 @@ public class Member implements Comparable<Member> { private final boolean isCritical; private final boolean enforceLocalMaxMemory; - Member(AddressComparor addressComparor, InternalDistributedMember memberId, boolean isCritical, + @VisibleForTesting + public Member(AddressComparor addressComparor, InternalDistributedMember memberId, + boolean isCritical, boolean enforceLocalMaxMemory) { this.addressComparor = addressComparor; this.memberId = memberId; @@ -48,7 +53,8 @@ public class Member implements Comparable<Member> { this.enforceLocalMaxMemory = enforceLocalMaxMemory; } - Member(AddressComparor addressComparor, InternalDistributedMember memberId, float weight, + @VisibleForTesting + public Member(AddressComparor addressComparor, InternalDistributedMember memberId, float weight, long localMaxMemory, boolean isCritical, boolean enforceLocalMaxMemory) { this(addressComparor, memberId, isCritical, enforceLocalMaxMemory); this.weight = weight; @@ -56,6 +62,55 @@ public class Member implements Comparable<Member> { } /** + * Check to see if the member is the last copy of the bucket in the redundancy zone + * + * @param bucket -- bucket to be deleted from the member + * @param distributionManager -- used to check members of redundancy zones + */ + + public RefusalReason canDelete(Bucket bucket, DistributionManager distributionManager) { + // This code only applies to Clusters. + if (!(distributionManager instanceof ClusterDistributionManager)) { + return RefusalReason.NONE; + } + + ClusterDistributionManager clstrDistrMgr = (ClusterDistributionManager) distributionManager; + String myRedundancyZone = clstrDistrMgr.getRedundancyZone(memberId); + boolean lastMemberOfZone = true; + + if (myRedundancyZone == null) { + // Not using redundancy zones, so... + return RefusalReason.NONE; + } + + for (Member member : bucket.getMembersHosting()) { + // Don't look at yourself because you are not redundant for yourself + if (member.getMemberId().equals(this.getMemberId())) { + continue; + } + + String memberRedundancyZone = clstrDistrMgr.getRedundancyZone(member.memberId); + if (memberRedundancyZone == null) { + // Not using redundancy zones, so... + continue; + } + + // Does the member redundancy zone match my redundancy zone? + // if so we are not the last in the redundancy zone. + if (memberRedundancyZone.equals(myRedundancyZone)) { + lastMemberOfZone = false; + } + } + + if (lastMemberOfZone) { + return RefusalReason.LAST_MEMBER_IN_ZONE; + } + + return RefusalReason.NONE; + } + + + /** * @param sourceMember the member we will be moving this bucket off of * @param checkZone true if we should not put two copies of a bucket on two nodes with the same * IP address. @@ -213,7 +268,7 @@ public class Member implements Comparable<Member> { } void changeTotalBytes(float change) { - totalBytes += (long) Math.round(change); + totalBytes += Math.round(change); } @Override diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java index 0457ba3..ec6b3ea 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java @@ -53,7 +53,7 @@ import org.apache.geode.logging.internal.log4j.api.LogService; * are assumed to be colocated, and the model adds together the load from each of the individual * regions to balance all of the regions together. * - * Reblancing operations are performed by repeatedly calling model.nextStep until it returns false. + * Rebalancing operations are performed by repeatedly calling model.nextStep until it returns false. * Each call to nextStep should perform another operation. The model will make callbacks to the * BucketOperator you provide to the contructor perform the actual create or move. * @@ -125,9 +125,13 @@ public class PartitionedRegionLoadModel { private final BucketOperator operator; private final int requiredRedundancy; - /** The average primary load on a member */ + /** + * The average primary load on a member + */ private float primaryAverage = -1; - /** The average bucket load on a member */ + /** + * The average bucket load on a member + */ private float averageLoad = -1; /** * The minimum improvement in variance that we'll consider worth moving a primary @@ -151,7 +155,8 @@ public class PartitionedRegionLoadModel { * @param redundancyLevel The expected redundancy level for the region */ public PartitionedRegionLoadModel(BucketOperator operator, int redundancyLevel, int numBuckets, - AddressComparor addressComparor, Set<InternalDistributedMember> criticalMembers, + AddressComparor addressComparor, + Set<InternalDistributedMember> criticalMembers, PartitionedRegion region) { this.operator = operator; this.requiredRedundancy = redundancyLevel; @@ -292,7 +297,7 @@ public class PartitionedRegionLoadModel { new Object[] {memberRollup, this.allColocatedRegions, memberRollup.getColocatedMembers().keySet(), memberRollup.getBuckets()}); } - for (Bucket bucket : new HashSet<Bucket>(memberRollup.getBuckets())) { + for (Bucket bucket : new HashSet<>(memberRollup.getBuckets())) { bucket.removeMember(memberRollup); } } @@ -473,21 +478,36 @@ public class PartitionedRegionLoadModel { Move bestMove = null; for (Member member : bucket.getMembersHosting()) { + + // If we can't delete it continue (last member of zone) + if (!member.canDelete(bucket, partitionedRegion.getDistributionManager()).willAccept()) { + continue; + } + + // if this load is lower than then highest load, we prefer the deleting from high + // load servers so move on. If this member is the bucket primary, we prefer not to move + // primaries, so move on. float newLoad = (member.getTotalLoad() - bucket.getLoad()) / member.getWeight(); - if (newLoad > mostLoaded && !member.equals(bucket.getPrimary())) { - Move move = new Move(null, member, bucket); - if (!this.attemptedBucketRemoves.contains(move)) { - mostLoaded = newLoad; - bestMove = move; - } + if (newLoad <= mostLoaded || member.equals(bucket.getPrimary())) { + continue; } + + // if the attemptedBucketRemovesList contains this move, then we don't need to add it + // again. + Move move = new Move(null, member, bucket); + if (this.attemptedBucketRemoves.contains(move)) { + continue; + } + + mostLoaded = newLoad; + bestMove = move; } return bestMove; } public Move findBestTargetForFPR(Bucket bucket, boolean checkIPAddress) { - InternalDistributedMember targetMemberID = null; - Member targetMember = null; + InternalDistributedMember targetMemberID; + Member targetMember; List<FixedPartitionAttributesImpl> fpas = this.partitionedRegion.getFixedPartitionAttributesImpl(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java index bd71b40..fcb95d2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java @@ -15,7 +15,13 @@ package org.apache.geode.internal.cache.partitioned.rebalance.model; public enum RefusalReason { - NONE, ALREADY_HOSTING, UNITIALIZED_MEMBER, SAME_ZONE, LOCAL_MAX_MEMORY_FULL, CRITICAL_HEAP; + NONE, + ALREADY_HOSTING, + UNITIALIZED_MEMBER, + SAME_ZONE, + LAST_MEMBER_IN_ZONE, + LOCAL_MAX_MEMORY_FULL, + CRITICAL_HEAP; public boolean willAccept() { return this == NONE; @@ -42,6 +48,11 @@ public enum RefusalReason { case CRITICAL_HEAP: return "Target member " + target.getMemberId() + " has reached its critical heap percentage, and cannot accept more data"; + case LAST_MEMBER_IN_ZONE: + return "Target member " + target.getMemberId() + + " is the last member of redundancy zone for the bucket " + + bucket.getId() + + ": " + bucket.getMembersHosting(); default: return this.toString(); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/MemberTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/MemberTest.java new file mode 100644 index 0000000..c1db31a --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/MemberTest.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.partitioned.rebalance; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import org.apache.geode.distributed.internal.ClusterDistributionManager; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.partitioned.rebalance.model.AddressComparor; +import org.apache.geode.internal.cache.partitioned.rebalance.model.Bucket; +import org.apache.geode.internal.cache.partitioned.rebalance.model.Member; +import org.apache.geode.internal.cache.partitioned.rebalance.model.RefusalReason; + +public class MemberTest { + private AddressComparor addressComparor = mock(AddressComparor.class); + private InternalDistributedMember memberId = mock(InternalDistributedMember.class); + private InternalDistributedMember otherMemberId = mock(InternalDistributedMember.class); + + @Test + public void testCanDeleteWhenNotLastInZone() { + doReturn(true).when(addressComparor).enforceUniqueZones(); + doReturn(true).when(addressComparor).areSameZone( + ArgumentMatchers.any(InternalDistributedMember.class), + ArgumentMatchers.any(InternalDistributedMember.class)); + Set<Member> membersHostingBucket = new HashSet<>(); + Bucket bucket = mock(Bucket.class); + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + + Member memberUnderTest = new Member(addressComparor, memberId, false, false); + Member otherMember = new Member(addressComparor, otherMemberId, false, false); + membersHostingBucket.add(memberUnderTest); + membersHostingBucket.add(otherMember); + when(bucket.getMembersHosting()).thenReturn(membersHostingBucket); + + Mockito.when(clusterDistributionManager.getRedundancyZone(memberId)).thenReturn("zoneA"); + Mockito.when(clusterDistributionManager.getRedundancyZone(otherMemberId)).thenReturn("zoneA"); + + assertThat(memberUnderTest.canDelete(bucket, clusterDistributionManager)) + .isEqualTo(RefusalReason.NONE); + } + + @Test + public void testCanDeleteWhenLastInZone() { + doReturn(true).when(addressComparor).enforceUniqueZones(); + doReturn(true).when(addressComparor).areSameZone( + ArgumentMatchers.any(InternalDistributedMember.class), + ArgumentMatchers.any(InternalDistributedMember.class)); + Set<Member> membersHostingBucket = new HashSet<>(); + Bucket bucket = mock(Bucket.class); + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + + Member memberUnderTest = new Member(addressComparor, memberId, false, false); + membersHostingBucket.add(memberUnderTest); + when(bucket.getMembersHosting()).thenReturn(membersHostingBucket); + + Mockito.when(clusterDistributionManager.getRedundancyZone(memberId)).thenReturn("zoneA"); + + assertThat(memberUnderTest.canDelete(bucket, clusterDistributionManager)) + .isEqualTo(RefusalReason.LAST_MEMBER_IN_ZONE); + } + + + @Test + public void testCanDeleteWhenNoZone() { + doReturn(true).when(addressComparor).enforceUniqueZones(); + doReturn(true).when(addressComparor).areSameZone( + ArgumentMatchers.any(InternalDistributedMember.class), + ArgumentMatchers.any(InternalDistributedMember.class)); + Set<Member> membersHostingBucket = new HashSet<>(); + Bucket bucket = mock(Bucket.class); + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + Member memberUnderTest = new Member(addressComparor, memberId, false, false); + Member otherMember = new Member(addressComparor, otherMemberId, false, false); + membersHostingBucket.add(memberUnderTest); + membersHostingBucket.add(otherMember); + when(bucket.getMembersHosting()).thenReturn(membersHostingBucket); + + Mockito.when(clusterDistributionManager.getRedundancyZone(memberId)).thenReturn(null); + Mockito.when(clusterDistributionManager.getRedundancyZone(otherMemberId)).thenReturn(null); + + assertThat(memberUnderTest.canDelete(bucket, clusterDistributionManager)) + .isEqualTo(RefusalReason.NONE); + } + +} diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java index 1d3ce1f..e25dc6b 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java @@ -14,8 +14,12 @@ */ package org.apache.geode.internal.cache.partitioned.rebalance; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.DataInput; import java.io.DataOutput; @@ -24,7 +28,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -39,16 +42,23 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; import org.apache.geode.DataSerializer; import org.apache.geode.cache.partition.PartitionMemberInfo; +import org.apache.geode.distributed.internal.ClusterDistributionManager; import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.partitioned.OfflineMemberDetails; import org.apache.geode.internal.cache.partitioned.OfflineMemberDetailsImpl; import org.apache.geode.internal.cache.partitioned.PRLoad; import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl; import org.apache.geode.internal.cache.partitioned.rebalance.BucketOperator.Completion; import org.apache.geode.internal.cache.partitioned.rebalance.model.AddressComparor; +import org.apache.geode.internal.cache.partitioned.rebalance.model.Bucket; +import org.apache.geode.internal.cache.partitioned.rebalance.model.Member; +import org.apache.geode.internal.cache.partitioned.rebalance.model.Move; import org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel; import org.apache.geode.internal.cache.persistence.PersistentMemberID; import org.apache.geode.internal.inet.LocalHostUtil; @@ -1489,11 +1499,62 @@ public class PartitionedRegionLoadModelJUnitTest { model.addRegion("primary", Arrays.asList(details1, details2), offlineDetails, true); assertEquals(3, doMoves(new CompositeDirector(true, true, true, true), model)); - Collection<Move> expectedMoves = new ArrayList<>(); + List<Move> expectedMoves = new ArrayList<>(); expectedMoves.add(new Move(member1, member2)); expectedMoves.add(new Move(member1, member2)); expectedMoves.add(new Move(member1, member2)); - assertEquals(expectedMoves, bucketOperator.bucketMoves); + assertThat(expectedMoves).containsAll(bucketOperator.bucketMoves); + } + + // Two members each host bucket. Fullyloadedmember hosts bucket2 as well. + // findBestRemove should do the bucket remove from fullyloadedmember + @Test + public void testFindBestRemoveRemovesFromMoreLoadedMember() { + Bucket bucket = mock(Bucket.class); + Bucket bucket2 = mock(Bucket.class); + AddressComparor addressComparor = mock(AddressComparor.class); + ClusterDistributionManager clusterDistributionManager = mock(ClusterDistributionManager.class); + + PartitionedRegion partitionedRegion = mock(PartitionedRegion.class); + PartitionedRegionLoadModel model = new PartitionedRegionLoadModel(bucketOperator, 2, 6, + addressComparor, Collections.emptySet(), partitionedRegion); + InternalDistributedMember memberId = mock(InternalDistributedMember.class); + InternalDistributedMember otherMemberId = mock(InternalDistributedMember.class); + + when(partitionedRegion.getDistributionManager()).thenReturn(clusterDistributionManager); + doReturn(true).when(addressComparor).enforceUniqueZones(); + doReturn(true).when(addressComparor).areSameZone( + ArgumentMatchers.any(InternalDistributedMember.class), + ArgumentMatchers.any(InternalDistributedMember.class)); + Set<Member> membersHostingBucket = new HashSet<>(); + Member halfLoadedMember = new Member(addressComparor, memberId, 100, 100, false, false); + Member fullyLoadedMember = new Member(addressComparor, otherMemberId, 100, 100, false, false); + + when(bucket.getPrimary()).thenReturn(halfLoadedMember); + when(bucket.getBytes()).thenReturn(10000000L); + when(bucket.getLoad()).thenReturn(50.0f); + when(bucket2.getPrimary()).thenReturn(fullyLoadedMember); + when(bucket2.getBytes()).thenReturn(10000000L); + when(bucket2.getLoad()).thenReturn(50.0f); + + fullyLoadedMember.addBucket(bucket); + fullyLoadedMember.addBucket(bucket2); + halfLoadedMember.addBucket(bucket); + membersHostingBucket.add(halfLoadedMember); + membersHostingBucket.add(fullyLoadedMember); + + when(bucket.getMembersHosting()).thenReturn(membersHostingBucket); + + Mockito.when(clusterDistributionManager.getRedundancyZone(memberId)).thenReturn("zoneA"); + Mockito.when(clusterDistributionManager.getRedundancyZone(otherMemberId)).thenReturn("zoneA"); + + Mockito.when(bucket.getMembersHosting()).thenReturn(membersHostingBucket); + + org.apache.geode.internal.cache.partitioned.rebalance.model.Move bestRemove = + model.findBestRemove(bucket); + + assertThat(bestRemove).isNotNull(); + assertThat(bestRemove.getTarget()).isEqualTo(fullyLoadedMember); } private int doMoves(RebalanceDirector director, PartitionedRegionLoadModel model) {