This is an automated email from the ASF dual-hosted git repository.

mhanson pushed a commit to branch support/1.13
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.13 by this push:
     new d94aa64  GEODE-9554 backport (#6856)
d94aa64 is described below

commit d94aa6431c4f19167feea417be4739c6746ab4e3
Author: mhansonp <hans...@vmware.com>
AuthorDate: Thu Sep 9 21:51:16 2021 -0700

    GEODE-9554 backport (#6856)
    
    * GEODE-9554: Change up the rebalance calls to use new canDelete call 
(#6845) (#6853)
    
    GEODE-9554: Change up the rebalance calls to use new canDelete call
    
    If there is no redundancy zone, exit and allow delete
    Adding xml files to git
    Adding xml files to git
    Added new tests
    
    (cherry picked from commit d1d605b24787698c5d8f47b8538808d6b990c0a4)
    (cherry picked from commit 142e06fe001aae428c32ec602992e45bcefa2259)
    
    * GEODE-5994: Cleanup from some additional review comments (#6852)
    
    (cherry picked from commit 7b924b5ccef4646f28615bab290447824f9f9f45)
    (cherry picked from commit 69ca9cdf75904f70d6272286f4322ba90ee8bfa3)
---
 .../RebalanceOperationComplexDistributedTest.java  | 272 +++++++++++++++++++++
 .../cache/RebalanceOperationComplex-client.xml     |  35 +++
 .../cache/RebalanceOperationComplex-server.xml     |  48 ++++
 .../internal/cache/PartitionedRegionDataStore.java |   9 -
 .../control/PartitionRebalanceDetailsImpl.java     |  21 ++
 .../partitioned/PartitionedRegionRebalanceOp.java  |   5 +-
 .../cache/partitioned/rebalance/model/Member.java  |  49 +++-
 .../model/PartitionedRegionLoadModel.java          |  46 +++-
 .../partitioned/rebalance/model/RefusalReason.java |  13 +-
 .../cache/partitioned/rebalance/MemberTest.java    | 107 ++++++++
 .../PartitionedRegionLoadModelJUnitTest.java       |  65 ++++-
 11 files changed, 638 insertions(+), 32 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexDistributedTest.java
new file mode 100644
index 0000000..9773437
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/control/RebalanceOperationComplexDistributedTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REDUNDANCY_ZONE;
+import static 
org.apache.geode.internal.lang.SystemPropertyHelper.DEFAULT_DISK_DIRS_PROPERTY;
+import static org.apache.geode.internal.lang.SystemPropertyHelper.GEODE_PREFIX;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * The purpose of RebalanceOperationComplexDistributedTest is to test 
rebalances
+ * across zones and to ensure that enforceUniqueZone behavior of redundancy 
zones
+ * is working correctly.
+ */
+@RunWith(JUnitParamsRunner.class)
+public class RebalanceOperationComplexDistributedTest implements Serializable {
+  public static final int EXPECTED_BUCKET_COUNT = 113;
+  public static final long TIMEOUT_SECONDS = 
GeodeAwaitility.getTimeout().getSeconds();
+  public static final String CLIENT_XML = 
"RebalanceOperationComplex-client.xml";
+  public static final String SERVER_XML = 
"RebalanceOperationComplex-server.xml";
+  public static final String REGION_NAME = "primary";
+  public static final String COLOCATED_REGION_NAME = "colocated";
+  public static final Logger logger = LogService.getLogger();
+
+  public static final String ZONE_A = "zoneA";
+  public static final String ZONE_B = "zoneB";
+  public int locatorPort;
+  public static final AtomicInteger runID = new AtomicInteger(0);
+  public String workingDir;
+
+  // 6 servers distributed evenly across 2 zones
+  public static final Map<Integer, String> SERVER_ZONE_MAP = new 
HashMap<Integer, String>() {
+    {
+      put(1, ZONE_A);
+      put(2, ZONE_A);
+      put(3, ZONE_A);
+      put(4, ZONE_B);
+      put(5, ZONE_B);
+      put(6, ZONE_B);
+    }
+  };
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(8);
+
+  @Before
+  public void setup() throws Exception {
+    // Start the locator
+    MemberVM locatorVM = clusterStartupRule.startLocatorVM(0);
+    locatorPort = locatorVM.getPort();
+
+    workingDir = clusterStartupRule.getWorkingDirRoot().getAbsolutePath();
+
+    runID.incrementAndGet();
+    cleanOutServerDirectories();
+
+    // Startup the servers
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      startServerInRedundancyZone(entry.getKey(), entry.getValue());
+    }
+
+    // Put data in the server regions
+    clientPopulateServers();
+
+  }
+
+  @After
+  public void after() {
+    stopServersAndDeleteDirectories();
+  }
+
+  /**
+   * Test that we correctly use the redundancy-zone property to determine 
where to place redundant
+   * copies of a buckets and doesn't allow cross redundancy zone deletes.
+   *
+   * @param rebalanceServer - the server index that will initiate all the 
rebalances
+   * @param serverToBeShutdownAndRestarted - the server index that will be 
shutdown and restarted
+   */
+  @Test
+  @Parameters({"1,2", "1,4", "4,1", "5,6"})
+  public void testEnforceZoneWithSixServersAndTwoZones(int rebalanceServer,
+      int serverToBeShutdownAndRestarted) {
+
+    // Rebalance Server VM will initiate all of the rebalances in this test
+    VM rebalanceServerVM = clusterStartupRule.getVM(rebalanceServer);
+
+    // Baseline rebalance with everything up
+    rebalanceServerVM.invoke(() -> 
doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // Take the serverToBeShutdownAndRestarted offline
+    clusterStartupRule.stop(serverToBeShutdownAndRestarted, false);
+
+    // Rebalance so that now all the buckets are on the other two servers.
+    // Zone b servers should not be touched.
+    rebalanceServerVM.invoke(() -> 
doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // Restart the serverToBeShutdownAndRestarted
+    startServerInRedundancyZone(serverToBeShutdownAndRestarted,
+        SERVER_ZONE_MAP.get(serverToBeShutdownAndRestarted));
+
+    // Do another rebalance to make sure all the buckets are distributed 
evenly(ish) and there are
+    // no cross redundancy zone bucket deletions.
+    rebalanceServerVM.invoke(() -> 
doRebalance(ClusterStartupRule.getCache().getResourceManager()));
+
+    // Verify that all bucket counts add up to what they should
+    compareZoneBucketCounts(COLOCATED_REGION_NAME);
+    compareZoneBucketCounts(REGION_NAME);
+  }
+
+  private void stopServersAndDeleteDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      clusterStartupRule.stop(entry.getKey(), true);
+    }
+    cleanOutServerDirectories();
+  }
+
+  private void cleanOutServerDirectories() {
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      int index = entry.getKey();
+      VM.getVM(index).invoke(() -> {
+        String path = workingDir + "/" + "runId-" + runID.get() + "-vm-" + 
index;
+        File temporaryDirectory = new File(path);
+        if (temporaryDirectory.exists()) {
+          try {
+            
Arrays.stream(temporaryDirectory.listFiles()).forEach(FileUtils::deleteQuietly);
+            Files.delete(temporaryDirectory.toPath());
+          } catch (Exception exception) {
+            logger.error("The delete of files or directory failed ", 
exception);
+            throw exception;
+          }
+        }
+      });
+    }
+  }
+
+  /**
+   * Startup a client to put all the data in the server regions
+   */
+  private void clientPopulateServers() throws Exception {
+    Properties properties2 = new Properties();
+    properties2.setProperty("cache-xml-file", CLIENT_XML);
+    ClientVM clientVM =
+        clusterStartupRule.startClientVM(SERVER_ZONE_MAP.size() + 1, 
properties2,
+            ccf -> ccf.addPoolLocator("localhost", locatorPort));
+
+    clientVM.invoke(() -> {
+      Map<Integer, String> putMap = new HashMap<>();
+      for (int i = 0; i < 1000; i++) {
+        putMap.put(i, "A");
+      }
+
+      ClientCache clientCache = ClusterStartupRule.getClientCache();
+
+      Stream.of(REGION_NAME, COLOCATED_REGION_NAME).forEach(regionName -> {
+        Region<Integer, String> region = clientCache.getRegion(regionName);
+        region.putAll(putMap);
+      });
+    });
+  }
+
+  /**
+   * Startup server *index* in *redundancy zone*
+   *
+   * @param index - server
+   * @param zone - Redundancy zone for the server to be started in
+   */
+  private void startServerInRedundancyZone(int index, final String zone) {
+    VM.getVM(index).invoke(() -> {
+      String path = workingDir + "/" + "runId-" + runID.get() + "-vm-" + index;
+
+      File temporaryDirectory = new File(path);
+      if (!temporaryDirectory.exists()) {
+        Files.createDirectory(temporaryDirectory.toPath());
+      }
+      System.setProperty(GEODE_PREFIX + DEFAULT_DISK_DIRS_PROPERTY, path);
+    });
+
+    clusterStartupRule.startServerVM(index, s -> 
s.withProperty("cache-xml-file",
+        SERVER_XML)
+        .withProperty(REDUNDANCY_ZONE, zone)
+        .withConnectionToLocator(locatorPort));
+  }
+
+  /**
+   * Trigger a rebalance of buckets
+   *
+   */
+  private void doRebalance(ResourceManager manager)
+      throws TimeoutException, InterruptedException {
+    manager.createRebalanceFactory()
+        .start().getResults(TIMEOUT_SECONDS, SECONDS);
+    assertThat(manager.getRebalanceOperations()).isEmpty();
+  }
+
+  /**
+   * Compare the bucket counts for each zone. They should be equal
+   *
+   */
+  private void compareZoneBucketCounts(final String regionName) {
+    int zoneABucketCount = getZoneBucketCount(regionName, ZONE_A);
+    int zoneBBucketCount = getZoneBucketCount(regionName, ZONE_B);
+
+    
assertThat(zoneABucketCount).isEqualTo(zoneBBucketCount).isEqualTo(EXPECTED_BUCKET_COUNT);
+  }
+
+  /**
+   * Get the bucket count for the region in the redundancy zone
+   *
+   * @param regionName - name of the region to get the bucket count of
+   * @param zoneName - redundancy zone for which to get the bucket count
+   * @return - the total bucket count for the region in the redundancy zone
+   */
+  private int getZoneBucketCount(String regionName, String zoneName) {
+    int bucketCount = 0;
+    for (Map.Entry<Integer, String> entry : SERVER_ZONE_MAP.entrySet()) {
+      if (entry.getValue().compareTo(zoneName) == 0) {
+        bucketCount +=
+            clusterStartupRule.getVM(entry.getKey()).invoke(() -> {
+              PartitionedRegion region =
+                  (PartitionedRegion) 
ClusterStartupRule.getCache().getRegion(regionName);
+              return region.getLocalBucketsListTestOnly().size();
+            });
+      }
+    }
+    return bucketCount;
+  }
+}
diff --git 
a/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-client.xml
 
b/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-client.xml
new file mode 100755
index 0000000..bc33c6e
--- /dev/null
+++ 
b/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-client.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<client-cache
+        xmlns="http://geode.apache.org/schema/cache";
+        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+        xsi:schemaLocation="http://geode.apache.org/schema/cache
+                        http://geode.apache.org/schema/cache/cache-1.0.xsd";
+        version="1.0">
+
+  <pdx read-serialized="false">
+    <pdx-serializer>
+      
<class-name>org.apache.geode.pdx.ReflectionBasedAutoSerializer</class-name>
+      <parameter name="classes">
+        <string>.*</string>
+      </parameter>
+    </pdx-serializer>
+  </pdx>
+  <region name="primary" refid="PROXY"/>
+  <region name="colocated" refid="PROXY"/>
+</client-cache>
diff --git 
a/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-server.xml
 
b/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-server.xml
new file mode 100755
index 0000000..d313d90
--- /dev/null
+++ 
b/geode-core/src/distributedTest/resources/org/apache/geode/internal/cache/RebalanceOperationComplex-server.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<cache
+  xmlns="http://geode.apache.org/schema/cache";
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://geode.apache.org/schema/cache
+                      http://geode.apache.org/schema/cache/cache-1.0.xsd";
+  version="1.0">
+  
+  <cache-server port="0"/>
+  
+  <pdx read-serialized="false" persistent="true">
+    <pdx-serializer>
+      
<class-name>org.apache.geode.pdx.ReflectionBasedAutoSerializer</class-name>
+      <parameter name="classes">
+        <string>.*</string>
+      </parameter>
+    </pdx-serializer>
+  </pdx>
+
+  <region name="primary" refid="PARTITION_REDUNDANT_PERSISTENT">
+    <region-attributes>
+      <partition-attributes total-num-buckets="113" redundant-copies="1" 
recovery-delay="-1" startup-recovery-delay="-1"/>
+    </region-attributes>
+  </region>
+
+  <region name="colocated" refid="PARTITION_REDUNDANT_PERSISTENT">
+    <region-attributes>
+      <partition-attributes colocated-with="primary" total-num-buckets="113" 
redundant-copies="1" recovery-delay="-1" startup-recovery-delay="-1"/>
+    </region-attributes>
+  </region>
+
+</cache>
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index 19adb18..4873eb6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -826,15 +826,6 @@ public class PartitionedRegionDataStore implements 
HasCachePerfStats {
       dumpBucket(bucketId, bucketRegion);
     }
 
-    // Iterator i = localRegion.entrySet().iterator();
-    // while (i.hasNext()) {
-    // try {
-    // NonTXEntry nte = (NonTXEntry) i.next();
-    // // updateBucket2Size(bucketId.longValue(), localRegion, null);
-    // // nte.getRegionEntry().getValueInVM();
-    // } catch (EntryDestroyedException ignore) {}
-    // }
-
     return bucketRegion;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
index 5eeb8aa..aaf4021 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/control/PartitionRebalanceDetailsImpl.java
@@ -48,6 +48,27 @@ public class PartitionRebalanceDetailsImpl
   private long time;
   private int numOfMembers;
 
+  @Override
+  public String toString() {
+    return "PartitionRebalanceDetailsImpl{" +
+        "bucketCreateBytes=" + bucketCreateBytes +
+        ", bucketCreateTime=" + bucketCreateTime +
+        ", bucketCreatesCompleted=" + bucketCreatesCompleted +
+        ", bucketRemoveBytes=" + bucketRemoveBytes +
+        ", bucketRemoveTime=" + bucketRemoveTime +
+        ", bucketRemovesCompleted=" + bucketRemovesCompleted +
+        ", bucketTransferBytes=" + bucketTransferBytes +
+        ", bucketTransferTime=" + bucketTransferTime +
+        ", bucketTransfersCompleted=" + bucketTransfersCompleted +
+        ", partitionMemberDetailsAfter=" + partitionMemberDetailsAfter +
+        ", partitionMemberDetailsBefore=" + partitionMemberDetailsBefore +
+        ", primaryTransferTime=" + primaryTransferTime +
+        ", primaryTransfersCompleted=" + primaryTransfersCompleted +
+        ", region=" + region +
+        ", time=" + time +
+        '}';
+  }
+
   public PartitionRebalanceDetailsImpl(PartitionedRegion region) {
     this.region = region;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
index 6dd53e7..757e227 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionedRegionRebalanceOp.java
@@ -290,9 +290,8 @@ public class PartitionedRegionRebalanceOp {
     // Early out of this VM doesn't have all of the regions that are
     // supposed to be colocated with this one.
     // TODO rebalance - there is a race here between this check and the
-    // earlier acquisition of the list
-    // of colocated regions. Really we should get a list of all colocated
-    // regions on all nodes.
+    // earlier acquisition of the list of colocated regions. Really we
+    // should get a list of all colocated regions on all nodes.
     if (!ColocationHelper.checkMembersColocation(leaderRegion,
         leaderRegion.getDistributionManager().getDistributionManagerId())) {
       return false;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java
index 777caeb..87000f7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/Member.java
@@ -19,6 +19,8 @@ import java.util.TreeSet;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.distributed.internal.DistributionManager;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
@@ -40,7 +42,9 @@ public class Member implements Comparable<Member> {
   private final boolean isCritical;
   private final boolean enforceLocalMaxMemory;
 
-  Member(AddressComparor addressComparor, InternalDistributedMember memberId, 
boolean isCritical,
+  @VisibleForTesting
+  public Member(AddressComparor addressComparor, InternalDistributedMember 
memberId,
+      boolean isCritical,
       boolean enforceLocalMaxMemory) {
     this.addressComparor = addressComparor;
     this.memberId = memberId;
@@ -48,7 +52,8 @@ public class Member implements Comparable<Member> {
     this.enforceLocalMaxMemory = enforceLocalMaxMemory;
   }
 
-  Member(AddressComparor addressComparor, InternalDistributedMember memberId, 
float weight,
+  @VisibleForTesting
+  public Member(AddressComparor addressComparor, InternalDistributedMember 
memberId, float weight,
       long localMaxMemory, boolean isCritical, boolean enforceLocalMaxMemory) {
     this(addressComparor, memberId, isCritical, enforceLocalMaxMemory);
     this.weight = weight;
@@ -56,6 +61,44 @@ public class Member implements Comparable<Member> {
   }
 
   /**
+   * Check to see if the member is the last copy of the bucket in the 
redundancy zone
+   *
+   * @param bucket -- bucket to be deleted from the member
+   * @param distributionManager -- used to check members of redundancy zones
+   */
+  public RefusalReason canDelete(Bucket bucket, DistributionManager 
distributionManager) {
+    // This code only applies to Clusters.
+    String myRedundancyZone = distributionManager.getRedundancyZone(memberId);
+
+    if (myRedundancyZone == null) {
+      // Not using redundancy zones, so...
+      return RefusalReason.NONE;
+    }
+
+    for (Member member : bucket.getMembersHosting()) {
+      // Don't look at yourself because you are not redundant for yourself
+      if (member.getMemberId().equals(this.getMemberId())) {
+        continue;
+      }
+
+      String memberRedundancyZone = 
distributionManager.getRedundancyZone(member.memberId);
+      if (memberRedundancyZone == null) {
+        // Not using redundancy zones, so...
+        continue;
+      }
+
+      // Does the member redundancy zone match my redundancy zone?
+      // if so we are not the last in the redundancy zone.
+      if (memberRedundancyZone.equals(myRedundancyZone)) {
+        return RefusalReason.NONE;
+      }
+    }
+
+    return RefusalReason.LAST_MEMBER_IN_ZONE;
+  }
+
+
+  /**
    * @param sourceMember the member we will be moving this bucket off of
    * @param checkZone true if we should not put two copies of a bucket on two 
nodes with the same
    *        IP address.
@@ -213,7 +256,7 @@ public class Member implements Comparable<Member> {
   }
 
   void changeTotalBytes(float change) {
-    totalBytes += (long) Math.round(change);
+    totalBytes += Math.round(change);
   }
 
   @Override
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
index 0457ba3..ec6b3ea 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/PartitionedRegionLoadModel.java
@@ -53,7 +53,7 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
  * are assumed to be colocated, and the model adds together the load from each 
of the individual
  * regions to balance all of the regions together.
  *
- * Reblancing operations are performed by repeatedly calling model.nextStep 
until it returns false.
+ * Rebalancing operations are performed by repeatedly calling model.nextStep 
until it returns false.
  * Each call to nextStep should perform another operation. The model will make 
callbacks to the
  * BucketOperator you provide to the contructor perform the actual create or 
move.
  *
@@ -125,9 +125,13 @@ public class PartitionedRegionLoadModel {
   private final BucketOperator operator;
   private final int requiredRedundancy;
 
-  /** The average primary load on a member */
+  /**
+   * The average primary load on a member
+   */
   private float primaryAverage = -1;
-  /** The average bucket load on a member */
+  /**
+   * The average bucket load on a member
+   */
   private float averageLoad = -1;
   /**
    * The minimum improvement in variance that we'll consider worth moving a 
primary
@@ -151,7 +155,8 @@ public class PartitionedRegionLoadModel {
    * @param redundancyLevel The expected redundancy level for the region
    */
   public PartitionedRegionLoadModel(BucketOperator operator, int 
redundancyLevel, int numBuckets,
-      AddressComparor addressComparor, Set<InternalDistributedMember> 
criticalMembers,
+      AddressComparor addressComparor,
+      Set<InternalDistributedMember> criticalMembers,
       PartitionedRegion region) {
     this.operator = operator;
     this.requiredRedundancy = redundancyLevel;
@@ -292,7 +297,7 @@ public class PartitionedRegionLoadModel {
               new Object[] {memberRollup, this.allColocatedRegions,
                   memberRollup.getColocatedMembers().keySet(), 
memberRollup.getBuckets()});
         }
-        for (Bucket bucket : new HashSet<Bucket>(memberRollup.getBuckets())) {
+        for (Bucket bucket : new HashSet<>(memberRollup.getBuckets())) {
           bucket.removeMember(memberRollup);
         }
       }
@@ -473,21 +478,36 @@ public class PartitionedRegionLoadModel {
     Move bestMove = null;
 
     for (Member member : bucket.getMembersHosting()) {
+
+      // If we can't delete it continue (last member of zone)
+      if (!member.canDelete(bucket, 
partitionedRegion.getDistributionManager()).willAccept()) {
+        continue;
+      }
+
+      // if this load is lower than then highest load, we prefer the deleting 
from high
+      // load servers so move on. If this member is the bucket primary, we 
prefer not to move
+      // primaries, so move on.
       float newLoad = (member.getTotalLoad() - bucket.getLoad()) / 
member.getWeight();
-      if (newLoad > mostLoaded && !member.equals(bucket.getPrimary())) {
-        Move move = new Move(null, member, bucket);
-        if (!this.attemptedBucketRemoves.contains(move)) {
-          mostLoaded = newLoad;
-          bestMove = move;
-        }
+      if (newLoad <= mostLoaded || member.equals(bucket.getPrimary())) {
+        continue;
       }
+
+      // if the attemptedBucketRemovesList contains this move, then we don't 
need to add it
+      // again.
+      Move move = new Move(null, member, bucket);
+      if (this.attemptedBucketRemoves.contains(move)) {
+        continue;
+      }
+
+      mostLoaded = newLoad;
+      bestMove = move;
     }
     return bestMove;
   }
 
   public Move findBestTargetForFPR(Bucket bucket, boolean checkIPAddress) {
-    InternalDistributedMember targetMemberID = null;
-    Member targetMember = null;
+    InternalDistributedMember targetMemberID;
+    Member targetMember;
     List<FixedPartitionAttributesImpl> fpas =
         this.partitionedRegion.getFixedPartitionAttributesImpl();
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java
index bd71b40..fcb95d2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/rebalance/model/RefusalReason.java
@@ -15,7 +15,13 @@
 package org.apache.geode.internal.cache.partitioned.rebalance.model;
 
 public enum RefusalReason {
-  NONE, ALREADY_HOSTING, UNITIALIZED_MEMBER, SAME_ZONE, LOCAL_MAX_MEMORY_FULL, 
CRITICAL_HEAP;
+  NONE,
+  ALREADY_HOSTING,
+  UNITIALIZED_MEMBER,
+  SAME_ZONE,
+  LAST_MEMBER_IN_ZONE,
+  LOCAL_MAX_MEMORY_FULL,
+  CRITICAL_HEAP;
 
   public boolean willAccept() {
     return this == NONE;
@@ -42,6 +48,11 @@ public enum RefusalReason {
       case CRITICAL_HEAP:
         return "Target member " + target.getMemberId()
             + " has reached its critical heap percentage, and cannot accept 
more data";
+      case LAST_MEMBER_IN_ZONE:
+        return "Target member " + target.getMemberId()
+            + " is the last member of redundancy zone for the bucket "
+            + bucket.getId()
+            + ": " + bucket.getMembersHosting();
       default:
         return this.toString();
     }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/MemberTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/MemberTest.java
new file mode 100644
index 0000000..c1db31a
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/MemberTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned.rebalance;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import 
org.apache.geode.internal.cache.partitioned.rebalance.model.AddressComparor;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Bucket;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Member;
+import 
org.apache.geode.internal.cache.partitioned.rebalance.model.RefusalReason;
+
+public class MemberTest {
+  private AddressComparor addressComparor = mock(AddressComparor.class);
+  private InternalDistributedMember memberId = 
mock(InternalDistributedMember.class);
+  private InternalDistributedMember otherMemberId = 
mock(InternalDistributedMember.class);
+
+  @Test
+  public void testCanDeleteWhenNotLastInZone() {
+    doReturn(true).when(addressComparor).enforceUniqueZones();
+    doReturn(true).when(addressComparor).areSameZone(
+        ArgumentMatchers.any(InternalDistributedMember.class),
+        ArgumentMatchers.any(InternalDistributedMember.class));
+    Set<Member> membersHostingBucket = new HashSet<>();
+    Bucket bucket = mock(Bucket.class);
+    ClusterDistributionManager clusterDistributionManager = 
mock(ClusterDistributionManager.class);
+
+    Member memberUnderTest = new Member(addressComparor, memberId, false, 
false);
+    Member otherMember = new Member(addressComparor, otherMemberId, false, 
false);
+    membersHostingBucket.add(memberUnderTest);
+    membersHostingBucket.add(otherMember);
+    when(bucket.getMembersHosting()).thenReturn(membersHostingBucket);
+
+    
Mockito.when(clusterDistributionManager.getRedundancyZone(memberId)).thenReturn("zoneA");
+    
Mockito.when(clusterDistributionManager.getRedundancyZone(otherMemberId)).thenReturn("zoneA");
+
+    assertThat(memberUnderTest.canDelete(bucket, clusterDistributionManager))
+        .isEqualTo(RefusalReason.NONE);
+  }
+
+  @Test
+  public void testCanDeleteWhenLastInZone() {
+    doReturn(true).when(addressComparor).enforceUniqueZones();
+    doReturn(true).when(addressComparor).areSameZone(
+        ArgumentMatchers.any(InternalDistributedMember.class),
+        ArgumentMatchers.any(InternalDistributedMember.class));
+    Set<Member> membersHostingBucket = new HashSet<>();
+    Bucket bucket = mock(Bucket.class);
+    ClusterDistributionManager clusterDistributionManager = 
mock(ClusterDistributionManager.class);
+
+    Member memberUnderTest = new Member(addressComparor, memberId, false, 
false);
+    membersHostingBucket.add(memberUnderTest);
+    when(bucket.getMembersHosting()).thenReturn(membersHostingBucket);
+
+    
Mockito.when(clusterDistributionManager.getRedundancyZone(memberId)).thenReturn("zoneA");
+
+    assertThat(memberUnderTest.canDelete(bucket, clusterDistributionManager))
+        .isEqualTo(RefusalReason.LAST_MEMBER_IN_ZONE);
+  }
+
+
+  @Test
+  public void testCanDeleteWhenNoZone() {
+    doReturn(true).when(addressComparor).enforceUniqueZones();
+    doReturn(true).when(addressComparor).areSameZone(
+        ArgumentMatchers.any(InternalDistributedMember.class),
+        ArgumentMatchers.any(InternalDistributedMember.class));
+    Set<Member> membersHostingBucket = new HashSet<>();
+    Bucket bucket = mock(Bucket.class);
+    ClusterDistributionManager clusterDistributionManager = 
mock(ClusterDistributionManager.class);
+    Member memberUnderTest = new Member(addressComparor, memberId, false, 
false);
+    Member otherMember = new Member(addressComparor, otherMemberId, false, 
false);
+    membersHostingBucket.add(memberUnderTest);
+    membersHostingBucket.add(otherMember);
+    when(bucket.getMembersHosting()).thenReturn(membersHostingBucket);
+
+    
Mockito.when(clusterDistributionManager.getRedundancyZone(memberId)).thenReturn(null);
+    
Mockito.when(clusterDistributionManager.getRedundancyZone(otherMemberId)).thenReturn(null);
+
+    assertThat(memberUnderTest.canDelete(bucket, clusterDistributionManager))
+        .isEqualTo(RefusalReason.NONE);
+  }
+
+}
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
index 1d3ce1f..a2bcfcb 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/rebalance/PartitionedRegionLoadModelJUnitTest.java
@@ -14,8 +14,12 @@
  */
 package org.apache.geode.internal.cache.partitioned.rebalance;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -24,7 +28,6 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -39,16 +42,21 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.cache.partition.PartitionMemberInfo;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.partitioned.OfflineMemberDetails;
 import org.apache.geode.internal.cache.partitioned.OfflineMemberDetailsImpl;
 import org.apache.geode.internal.cache.partitioned.PRLoad;
 import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
 import 
org.apache.geode.internal.cache.partitioned.rebalance.BucketOperator.Completion;
 import 
org.apache.geode.internal.cache.partitioned.rebalance.model.AddressComparor;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Bucket;
+import org.apache.geode.internal.cache.partitioned.rebalance.model.Member;
 import 
org.apache.geode.internal.cache.partitioned.rebalance.model.PartitionedRegionLoadModel;
 import org.apache.geode.internal.cache.persistence.PersistentMemberID;
 import org.apache.geode.internal.inet.LocalHostUtil;
@@ -1489,11 +1497,62 @@ public class PartitionedRegionLoadModelJUnitTest {
     model.addRegion("primary", Arrays.asList(details1, details2), 
offlineDetails, true);
     assertEquals(3, doMoves(new CompositeDirector(true, true, true, true), 
model));
 
-    Collection<Move> expectedMoves = new ArrayList<>();
+    List<Move> expectedMoves = new ArrayList<>();
     expectedMoves.add(new Move(member1, member2));
     expectedMoves.add(new Move(member1, member2));
     expectedMoves.add(new Move(member1, member2));
-    assertEquals(expectedMoves, bucketOperator.bucketMoves);
+    assertThat(expectedMoves).containsAll(bucketOperator.bucketMoves);
+  }
+
+  // Two members each host bucket. Fullyloadedmember hosts bucket2 as well.
+  // findBestRemove should do the bucket remove from fullyloadedmember
+  @Test
+  public void testFindBestRemoveRemovesFromMoreLoadedMember() {
+    Bucket bucket = mock(Bucket.class);
+    Bucket bucket2 = mock(Bucket.class);
+    AddressComparor addressComparor = mock(AddressComparor.class);
+    ClusterDistributionManager clusterDistributionManager = 
mock(ClusterDistributionManager.class);
+
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    PartitionedRegionLoadModel model = new 
PartitionedRegionLoadModel(bucketOperator, 2, 6,
+        addressComparor, Collections.emptySet(), partitionedRegion);
+    InternalDistributedMember memberId = mock(InternalDistributedMember.class);
+    InternalDistributedMember otherMemberId = 
mock(InternalDistributedMember.class);
+
+    
when(partitionedRegion.getDistributionManager()).thenReturn(clusterDistributionManager);
+    doReturn(true).when(addressComparor).enforceUniqueZones();
+    doReturn(true).when(addressComparor).areSameZone(
+        ArgumentMatchers.any(InternalDistributedMember.class),
+        ArgumentMatchers.any(InternalDistributedMember.class));
+    Set<Member> membersHostingBucket = new HashSet<>();
+    Member halfLoadedMember = new Member(addressComparor, memberId, 100, 100, 
false, false);
+    Member fullyLoadedMember = new Member(addressComparor, otherMemberId, 100, 
100, false, false);
+
+    when(bucket.getPrimary()).thenReturn(halfLoadedMember);
+    when(bucket.getBytes()).thenReturn(10000000L);
+    when(bucket.getLoad()).thenReturn(50.0f);
+    when(bucket2.getPrimary()).thenReturn(fullyLoadedMember);
+    when(bucket2.getBytes()).thenReturn(10000000L);
+    when(bucket2.getLoad()).thenReturn(50.0f);
+
+    fullyLoadedMember.addBucket(bucket);
+    fullyLoadedMember.addBucket(bucket2);
+    halfLoadedMember.addBucket(bucket);
+    membersHostingBucket.add(halfLoadedMember);
+    membersHostingBucket.add(fullyLoadedMember);
+
+    when(bucket.getMembersHosting()).thenReturn(membersHostingBucket);
+
+    
when(clusterDistributionManager.getRedundancyZone(memberId)).thenReturn("zoneA");
+    
when(clusterDistributionManager.getRedundancyZone(otherMemberId)).thenReturn("zoneA");
+
+    when(bucket.getMembersHosting()).thenReturn(membersHostingBucket);
+
+    org.apache.geode.internal.cache.partitioned.rebalance.model.Move 
bestRemove =
+        model.findBestRemove(bucket);
+
+    assertThat(bestRemove).isNotNull();
+    assertThat(bestRemove.getTarget()).isEqualTo(fullyLoadedMember);
   }
 
   private int doMoves(RebalanceDirector director, PartitionedRegionLoadModel 
model) {

Reply via email to