This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 72a8846  HBASE-26309 Balancer tends to move regions to the server at 
the end of list (#3723)
72a8846 is described below

commit 72a88468a8812e94feb19364d95089a29a8487e6
Author: clarax <clarax98...@gmail.com>
AuthorDate: Tue Nov 2 06:56:38 2021 -0700

    HBASE-26309 Balancer tends to move regions to the server at the end of list 
(#3723)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
---
 .../master/balancer/BalancerClusterState.java      |  8 ++++
 .../master/balancer/LoadCandidateGenerator.java    | 51 +++++++++++++++++-----
 .../master/balancer/StochasticLoadBalancer.java    |  6 +--
 .../balancer/StochasticBalancerTestBase.java       | 38 +++++++++++++++-
 ...ochasticLoadBalancerRegionReplicaWithRacks.java | 40 +++++++++++++++--
 5 files changed, 122 insertions(+), 21 deletions(-)

diff --git 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
index c69f17c..dbe23ef 100644
--- 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
+++ 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/BalancerClusterState.java
@@ -179,6 +179,7 @@ class BalancerClusterState {
       serversPerHostList.get(hostIndex).add(serverIndex);
 
       String rack = this.rackManager.getRack(sn);
+
       if (!racksToIndex.containsKey(rack)) {
         racksToIndex.put(rack, numRacks++);
         serversPerRackList.add(new ArrayList<>());
@@ -187,6 +188,7 @@ class BalancerClusterState {
       serversPerRackList.get(rackIndex).add(serverIndex);
     }
 
+    LOG.debug("Hosts are {} racks are {}", hostsToIndex, racksToIndex);
     // Count how many regions there are.
     for (Map.Entry<ServerName, List<RegionInfo>> entry : 
clusterState.entrySet()) {
       numRegions += entry.getValue().size();
@@ -285,6 +287,7 @@ class BalancerClusterState {
       serversPerHost[i] = new int[serversPerHostList.get(i).size()];
       for (int j = 0; j < serversPerHost[i].length; j++) {
         serversPerHost[i][j] = serversPerHostList.get(i).get(j);
+        LOG.debug("server {} is on host {}",serversPerHostList.get(i).get(j), 
i);
       }
       if (serversPerHost[i].length > 1) {
         multiServersPerHost = true;
@@ -295,6 +298,7 @@ class BalancerClusterState {
       serversPerRack[i] = new int[serversPerRackList.get(i).size()];
       for (int j = 0; j < serversPerRack[i].length; j++) {
         serversPerRack[i][j] = serversPerRackList.get(i).get(j);
+        LOG.info("server {} is on rack {}",serversPerRackList.get(i).get(j), 
i);
       }
     }
 
@@ -792,6 +796,10 @@ class BalancerClusterState {
 
   private Comparator<Integer> numRegionsComparator = 
Comparator.comparingInt(this::getNumRegions);
 
+  public Comparator<Integer> getNumRegionsComparator() {
+    return numRegionsComparator;
+  }
+
   int getLowestLocalityRegionOnServer(int serverIndex) {
     if (regionFinder != null) {
       float lowestLocality = 1.0f;
diff --git 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java
 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java
index 595e185..8604f4a 100644
--- 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java
+++ 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/LoadCandidateGenerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.master.balancer;
 
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
@@ -34,27 +35,53 @@ class LoadCandidateGenerator extends CandidateGenerator {
   private int pickLeastLoadedServer(final BalancerClusterState cluster, int 
thisServer) {
     Integer[] servers = cluster.serverIndicesSortedByRegionCount;
 
-    int index = 0;
-    while (servers[index] == null || servers[index] == thisServer) {
-      index++;
-      if (index == servers.length) {
-        return -1;
+    int selectedIndex = -1;
+    double currentLargestRandom = -1;
+    for (int i = 0; i < servers.length; i++) {
+      if (servers[i] == null || servers[i] == thisServer) {
+        continue;
+      }
+      if (selectedIndex != -1
+        && cluster.getNumRegionsComparator().compare(servers[i], 
servers[selectedIndex]) != 0) {
+        // Exhausted servers of the same region count
+        break;
+      }
+      // we don't know how many servers have the same region count, we will 
randomly select one
+      // using a simplified inline reservoir sampling by assignmening a random 
number to  stream
+      // data and choose the greatest one. 
(http://gregable.com/2007/10/reservoir-sampling.html)
+      double currentRandom = ThreadLocalRandom.current().nextDouble();
+      if (currentRandom > currentLargestRandom) {
+        selectedIndex = i;
+        currentLargestRandom = currentRandom;
       }
     }
-    return servers[index];
+    return selectedIndex == -1 ? -1 : servers[selectedIndex];
   }
 
   private int pickMostLoadedServer(final BalancerClusterState cluster, int 
thisServer) {
     Integer[] servers = cluster.serverIndicesSortedByRegionCount;
 
-    int index = servers.length - 1;
-    while (servers[index] == null || servers[index] == thisServer) {
-      index--;
-      if (index < 0) {
-        return -1;
+    int selectedIndex = -1;
+    double currentLargestRandom = -1;
+    for (int i = servers.length - 1; i >= 0; i--) {
+      if (servers[i] == null || servers[i] == thisServer) {
+        continue;
+      }
+      if (selectedIndex != -1 && 
cluster.getNumRegionsComparator().compare(servers[i],
+        servers[selectedIndex]) != 0) {
+        // Exhausted servers of the same region count
+        break;
+      }
+      // we don't know how many servers have the same region count, we will 
randomly select one
+      // using a simplified inline reservoir sampling by assignmening a random 
number to  stream
+      // data and choose the greatest one. 
(http://gregable.com/2007/10/reservoir-sampling.html)
+      double currentRandom = ThreadLocalRandom.current().nextDouble();
+      if (currentRandom > currentLargestRandom) {
+        selectedIndex = i;
+        currentLargestRandom = currentRandom;
       }
     }
-    return servers[index];
+    return selectedIndex == -1? -1 : servers[selectedIndex];
   }
 
 }
diff --git 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 11eb85b..a796085 100644
--- 
a/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ 
b/hbase-balancer/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -345,8 +345,6 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
     }
 
     if (idleRegionServerExist(cluster)){
-      LOG.info("Running balancer because at least one server hosts replicas of 
the same region." +
-        "regionReplicaRackCostFunction={}", 
regionReplicaRackCostFunction.cost());
       LOG.info("Running balancer because cluster has idle server(s)."+
         " function cost={}", functionCost());
       return true;
@@ -510,9 +508,9 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
       LOG.info("Finished computing new moving plan. Computation took {} ms" +
           " to try {} different iterations.  Found a solution that moves " +
           "{} regions; Going from a computed imbalance of {}" +
-          " to a new imbalance of {}. ",
+          " to a new imbalance of {}. funtionCost={}",
         endTime - startTime, step, plans.size(),
-        initCost / sumMultiplier, currentCost / sumMultiplier);
+        initCost / sumMultiplier, currentCost / sumMultiplier, functionCost());
       sendRegionPlansToRingBuffer(plans, currentCost, initCost, 
initFunctionTotalCosts, step);
       return plans;
     }
diff --git 
a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java
 
b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java
index 56d5f10..9237968 100644
--- 
a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java
+++ 
b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/StochasticBalancerTestBase.java
@@ -80,7 +80,7 @@ public class StochasticBalancerTestBase extends 
BalancerTestBase {
       List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
 
       // Print out the cluster loads to make debugging easier.
-      LOG.info("Mock Balance : " + printMock(balancedCluster));
+      LOG.info("Mock after Balance : " + printMock(balancedCluster));
 
       if (assertFullyBalanced) {
         assertClusterAsBalanced(balancedCluster);
@@ -95,4 +95,40 @@ public class StochasticBalancerTestBase extends 
BalancerTestBase {
       }
     }
   }
+
+  protected void testWithClusterWithIteration(Map<ServerName, 
List<RegionInfo>> serverMap,
+    RackManager rackManager, boolean assertFullyBalanced, boolean 
assertFullyBalancedForReplicas) {
+    List<ServerAndLoad> list = convertToList(serverMap);
+    LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
+
+    loadBalancer.setRackManager(rackManager);
+    // Run the balancer.
+    Map<TableName, Map<ServerName, List<RegionInfo>>> LoadOfAllTable = (Map) 
mockClusterServersWithTables(serverMap);
+    List<RegionPlan> plans = loadBalancer.balanceCluster(LoadOfAllTable);
+    assertNotNull("Initial cluster balance should produce plans.", plans);
+
+    List<ServerAndLoad> balancedCluster = null;
+    // Run through iteration until done. Otherwise will be killed as test time 
out
+    while (plans != null && (assertFullyBalanced || 
assertFullyBalancedForReplicas)) {
+      // Apply the plan to the mock cluster.
+      balancedCluster = reconcile(list, plans, serverMap);
+
+      // Print out the cluster loads to make debugging easier.
+      LOG.info("Mock after balance: " + printMock(balancedCluster));
+
+      LoadOfAllTable = (Map) mockClusterServersWithTables(serverMap);
+      plans = loadBalancer.balanceCluster(LoadOfAllTable);
+    }
+
+    // Print out the cluster loads to make debugging easier.
+    LOG.info("Mock Final balance: " + printMock(balancedCluster));
+
+    if (assertFullyBalanced) {
+      assertNull("Given a requirement to be fully balanced, second attempt at 
plans should " +
+        "produce none.", plans);
+    }
+    if (assertFullyBalancedForReplicas) {
+      assertRegionReplicaPlacement(serverMap, rackManager);
+    }
+  }
 }
diff --git 
a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java
 
b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java
index 3b2c847..0e5ecd3 100644
--- 
a/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java
+++ 
b/hbase-balancer/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancerRegionReplicaWithRacks.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.master.balancer;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -37,25 +38,36 @@ public class 
TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic
       
HBaseClassTestRule.forClass(TestStochasticLoadBalancerRegionReplicaWithRacks.class);
 
   private static class ForTestRackManager extends RackManager {
+
     int numRacks;
+    Map<String, Integer> serverIndexes = new HashMap<String, Integer>();
+    int numServers = 0;
 
     public ForTestRackManager(int numRacks) {
       this.numRacks = numRacks;
     }
 
+
     @Override
     public String getRack(ServerName server) {
-      return "rack_" + (server.hashCode() % numRacks);
+      String key = server.getServerName();
+      if (!serverIndexes.containsKey(key)) {
+        serverIndexes.put(key, numServers++);
+      }
+      return "rack_" + serverIndexes.get(key) % numRacks;
     }
   }
 
   @Test
   public void testRegionReplicationOnMidClusterWithRacks() {
-    conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 10000000L);
+    conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 100000000L);
+    conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", true);
     conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
     conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 
1000); // 120 sec
+    // for full balance
+//    conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 
0.001f);
     loadBalancer.onConfigurationChange(conf);
-    int numNodes = 4;
+    int numNodes = 5;
     int numRegions = numNodes * 1;
     int replication = 3; // 3 replicas per region
     int numRegionsPerServer = 1;
@@ -65,6 +77,26 @@ public class 
TestStochasticLoadBalancerRegionReplicaWithRacks extends Stochastic
         createServerMap(numNodes, numRegions, numRegionsPerServer, 
replication, numTables);
     RackManager rm = new ForTestRackManager(numRacks);
 
-    testWithCluster(serverMap, rm, false, true);
+    testWithClusterWithIteration(serverMap, rm, true, true);
+  }
+
+  @Test
+  public void testRegionReplicationOnLargeClusterWithRacks() {
+    conf.setBoolean("hbase.master.balancer.stochastic.runMaxSteps", false);
+    conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 5000L);
+    conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
+    conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 10 * 
1000); // 10 sec
+    loadBalancer.onConfigurationChange(conf);
+    int numNodes = 100;
+    int numRegions = numNodes * 30;
+    int replication = 3; // 3 replicas per region
+    int numRegionsPerServer = 28;
+    int numTables = 1;
+    int numRacks = 4; // all replicas should be on a different rack
+    Map<ServerName, List<RegionInfo>> serverMap =
+      createServerMap(numNodes, numRegions, numRegionsPerServer, replication, 
numTables);
+    RackManager rm = new ForTestRackManager(numRacks);
+
+    testWithClusterWithIteration(serverMap, rm, true, true);
   }
 }

Reply via email to