This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 533c84d  HBASE-25739 TableSkewCostFunction need to use aggregated 
deviation (#3067)
533c84d is described below

commit 533c84d330f7314cca4f2f14b4cf6b8454eafb0a
Author: clarax <clarax98...@gmail.com>
AuthorDate: Thu Apr 15 13:12:07 2021 -0700

    HBASE-25739 TableSkewCostFunction need to use aggregated deviation (#3067)
    
    Signed-off-by: Michael Stack <st...@apache.org>
    Reviewed-by: David Manning <david.mann...@salesforce.com>
---
 .../hbase/master/balancer/BaseLoadBalancer.java    | 120 +++++++++++++++++----
 .../master/balancer/StochasticLoadBalancer.java    | 101 ++++++-----------
 .../master/balancer/TestBaseLoadBalancer.java      |   4 +-
 3 files changed, 131 insertions(+), 94 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 1466570..b411636 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -162,7 +162,12 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
     int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex 
(initial cluster state)
     int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
     int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # 
regions
-    int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions 
in a single RS
+    int[] numRegionsPerTable; // tableIndex -> region count
+    double[] meanRegionsPerTable; // mean region count per table
+    double regionSkewByTable;       // skew on RS per by table
+    double minRegionSkewByTable;       // min skew on RS per by table
+    double maxRegionSkewByTable;       // max skew on RS per by table
+
     int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the 
primary
     boolean hasRegionReplicas = false;   //whether there is regions with 
replicas
 
@@ -370,6 +375,7 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
 
       numTables = tables.size();
       numRegionsPerServerPerTable = new int[numServers][numTables];
+      numRegionsPerTable = new int[numTables];
 
       for (int i = 0; i < numServers; i++) {
         for (int j = 0; j < numTables; j++) {
@@ -377,21 +383,29 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
         }
       }
 
+      for (int i = 0; i < numTables; i++) {
+        numRegionsPerTable[i] = 0;
+      }
+
       for (int i=0; i < regionIndexToServerIndex.length; i++) {
         if (regionIndexToServerIndex[i] >= 0) {
           
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
+          numRegionsPerTable[regionIndexToTableIndex[i]]++;
         }
       }
 
-      numMaxRegionsPerTable = new int[numTables];
-      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
-        for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; 
tableIndex++) {
-          if (aNumRegionsPerServerPerTable[tableIndex] > 
numMaxRegionsPerTable[tableIndex]) {
-            numMaxRegionsPerTable[tableIndex] = 
aNumRegionsPerServerPerTable[tableIndex];
-          }
-        }
+      // Avoid repeated computation for planning
+      meanRegionsPerTable = new double[numTables];
+      maxRegionSkewByTable = 0;
+      minRegionSkewByTable = 0;
+      for (int i = 0; i < numTables; i++) {
+        meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / 
numServers;
+        minRegionSkewByTable += Cluster.getMinSkew(numRegionsPerTable[i], 
numServers);
+        maxRegionSkewByTable += Cluster.getMaxSkew(numRegionsPerTable[i], 
numServers);
       }
 
+      computeRegionSkewPerTable();
+
       for (int i = 0; i < regions.length; i ++) {
         RegionInfo info = regions[i];
         if (RegionReplicaUtil.isDefaultReplica(info)) {
@@ -517,6 +531,53 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
     }
 
     /**
+     * Return the min skew of distribution
+     */
+    public static double getMinSkew(double total, double numServers) {
+      double mean = total / numServers;
+      // It's possible that there aren't enough regions to go around
+      double min;
+      if (numServers > total) {
+        min = ((numServers - total) * mean + (1 - mean) * total) ;
+      } else {
+        // Some will have 1 more than everything else.
+        int numHigh = (int) (total - (Math.floor(mean) * numServers));
+        int numLow = (int) (numServers - numHigh);
+        min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - 
Math.floor(mean));
+      }
+      return min;
+    }
+
+    /**
+     * Return the max deviation of distribution
+     * Compute max as if all region servers had 0 and one had the sum of all 
costs.  This must be
+     * a zero sum cost for this to make sense.
+     */
+    public static double getMaxSkew(double total, double numServers) {
+      double mean = total / numServers;
+      return (total - mean) + (numServers - 1) * mean;
+    }
+
+    /**
+     * Scale the value between 0 and 1.
+     *
+     * @param min   Min value
+     * @param max   The Max value
+     * @param value The value to be scaled.
+     * @return The scaled value.
+     */
+    public static double scale(double min, double max, double value) {
+      if (max <= min || value <= min) {
+        return 0;
+      }
+      if ((max - min) == 0) {
+        return 0;
+      }
+
+      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
+    }
+
+    /**
      * Retrieves and lazily initializes a field storing the locality of
      * every region/server combination
      */
@@ -574,6 +635,21 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
     }
 
     /**
+     * Recompute the region skew during init or plan of moves.
+     */
+    private void computeRegionSkewPerTable() {
+      // reinitialize for recomputation
+      regionSkewByTable = 0;
+
+      for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) {
+        for (int tableIndex = 0; tableIndex < 
aNumRegionsPerServerPerTable.length; tableIndex++) {
+          regionSkewByTable += 
Math.abs(aNumRegionsPerServerPerTable[tableIndex]
+            - meanRegionsPerTable[tableIndex]);
+        }
+      }
+    }
+
+    /**
      * Computes and caches the locality for each region/rack combinations,
      * as well as storing a mapping of region -> server and region -> rack 
such that server
      * and rack have the highest locality for region
@@ -828,22 +904,20 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
       int tableIndex = regionIndexToTableIndex[region];
       if (oldServer >= 0) {
         numRegionsPerServerPerTable[oldServer][tableIndex]--;
+        // update regionSkewPerTable for the move from old server
+        regionSkewByTable +=
+            Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex]
+              - meanRegionsPerTable[tableIndex])
+              - Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex] + 1
+              - meanRegionsPerTable[tableIndex]);
       }
       numRegionsPerServerPerTable[newServer][tableIndex]++;
-
-      //check whether this caused maxRegionsPerTable in the new Server to be 
updated
-      if (numRegionsPerServerPerTable[newServer][tableIndex] > 
numMaxRegionsPerTable[tableIndex]) {
-        numMaxRegionsPerTable[tableIndex] = 
numRegionsPerServerPerTable[newServer][tableIndex];
-      } else if (oldServer >= 0 && 
(numRegionsPerServerPerTable[oldServer][tableIndex] + 1)
-          == numMaxRegionsPerTable[tableIndex]) {
-        //recompute maxRegionsPerTable since the previous value was coming 
from the old server
-        numMaxRegionsPerTable[tableIndex] = 0;
-        for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) 
{
-          if (aNumRegionsPerServerPerTable[tableIndex] > 
numMaxRegionsPerTable[tableIndex]) {
-            numMaxRegionsPerTable[tableIndex] = 
aNumRegionsPerServerPerTable[tableIndex];
-          }
-        }
-      }
+      // update regionSkewPerTable for the move to new server
+      regionSkewByTable +=
+        Math.abs(numRegionsPerServerPerTable[newServer][tableIndex]
+          - meanRegionsPerTable[tableIndex])
+          - Math.abs(numRegionsPerServerPerTable[newServer][tableIndex] - 1
+          - meanRegionsPerTable[tableIndex]);
 
       // update for servers
       int primary = regionIndexToPrimaryIndex[region];
@@ -1013,7 +1087,7 @@ public abstract class BaseLoadBalancer implements 
LoadBalancer {
           .append(Arrays.toString(serverIndicesSortedByRegionCount))
           .append(", 
regionsPerServer=").append(Arrays.deepToString(regionsPerServer));
 
-      desc.append(", 
numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable))
+      desc.append(", regionSkewByTable=").append(regionSkewByTable)
           .append(", numRegions=").append(numRegions).append(", 
numServers=").append(numServers)
           .append(", numTables=").append(numTables).append(", 
numMovedRegions=")
           .append(numMovedRegions).append('}');
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index b40779a..e8a7788 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -762,6 +762,7 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
     boolean isNeeded() {
       return true;
     }
+
     float getMultiplier() {
       return multiplier;
     }
@@ -770,35 +771,39 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
       this.multiplier = m;
     }
 
-    /** Called once per LB invocation to give the cost function
+    /**
+     * Called once per LB invocation to give the cost function
      * to initialize it's state, and perform any costly calculation.
      */
     void init(Cluster cluster) {
       this.cluster = cluster;
     }
 
-    /** Called once per cluster Action to give the cost function
+    /**
+     * Called once per cluster Action to give the cost function
      * an opportunity to update it's state. postAction() is always
      * called at least once before cost() is called with the cluster
-     * that this action is performed on. */
+     * that this action is performed on.
+     */
     void postAction(Action action) {
       switch (action.type) {
-      case NULL: break;
-      case ASSIGN_REGION:
-        AssignRegionAction ar = (AssignRegionAction) action;
-        regionMoved(ar.region, -1, ar.server);
-        break;
-      case MOVE_REGION:
-        MoveRegionAction mra = (MoveRegionAction) action;
-        regionMoved(mra.region, mra.fromServer, mra.toServer);
-        break;
-      case SWAP_REGIONS:
-        SwapRegionsAction a = (SwapRegionsAction) action;
-        regionMoved(a.fromRegion, a.fromServer, a.toServer);
-        regionMoved(a.toRegion, a.toServer, a.fromServer);
-        break;
-      default:
-        throw new RuntimeException("Uknown action:" + action.type);
+        case NULL:
+          break;
+        case ASSIGN_REGION:
+          AssignRegionAction ar = (AssignRegionAction) action;
+          regionMoved(ar.region, -1, ar.server);
+          break;
+        case MOVE_REGION:
+          MoveRegionAction mra = (MoveRegionAction) action;
+          regionMoved(mra.region, mra.fromServer, mra.toServer);
+          break;
+        case SWAP_REGIONS:
+          SwapRegionsAction a = (SwapRegionsAction) action;
+          regionMoved(a.fromRegion, a.fromServer, a.toServer);
+          regionMoved(a.toRegion, a.toServer, a.fromServer);
+          break;
+        default:
+          throw new RuntimeException("Uknown action:" + action.type);
       }
     }
 
@@ -822,59 +827,25 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
       double total = getSum(stats);
 
       double count = stats.length;
-      double mean = total/count;
-
-      // Compute max as if all region servers had 0 and one had the sum of all 
costs.  This must be
-      // a zero sum cost for this to make sense.
-      double max = ((count - 1) * mean) + (total - mean);
-
-      // It's possible that there aren't enough regions to go around
-      double min;
-      if (count > total) {
-        min = ((count - total) * mean) + ((1 - mean) * total);
-      } else {
-        // Some will have 1 more than everything else.
-        int numHigh = (int) (total - (Math.floor(mean) * count));
-        int numLow = (int) (count - numHigh);
-
-        min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - 
Math.floor(mean)));
+      double mean = total / count;
 
-      }
-      min = Math.max(0, min);
-      for (int i=0; i<stats.length; i++) {
+      for (int i = 0; i < stats.length; i++) {
         double n = stats[i];
         double diff = Math.abs(mean - n);
         totalCost += diff;
       }
 
-      double scaled =  scale(min, max, totalCost);
-      return scaled;
+      return Cluster
+        .scale(Cluster.getMinSkew(total, count), Cluster.getMaxSkew(total, 
count), totalCost);
     }
 
     private double getSum(double[] stats) {
       double total = 0;
-      for(double s:stats) {
+      for (double s : stats) {
         total += s;
       }
       return total;
     }
-
-    /**
-     * Scale the value between 0 and 1.
-     *
-     * @param min   Min value
-     * @param max   The Max value
-     * @param value The value to be scaled.
-     * @return The scaled value.
-     */
-    protected double scale(double min, double max, double value) {
-      if (max <= min || value <= min) {
-        return 0;
-      }
-      if ((max - min) == 0) return 0;
-
-      return Math.max(0d, Math.min(1d, (value - min) / (max - min)));
-    }
   }
 
   /**
@@ -927,7 +898,7 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
         return 1000000;   // return a number much greater than any of the 
other cost
       }
 
-      return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost);
+      return Cluster.scale(0, Math.min(cluster.numRegions, maxMoves), 
moveCost);
     }
   }
 
@@ -1035,15 +1006,7 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
 
     @Override
     protected double cost() {
-      double max = cluster.numRegions;
-      double min = ((double) cluster.numRegions) / cluster.numServers;
-      double value = 0;
-
-      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
-        value += cluster.numMaxRegionsPerTable[i];
-      }
-
-      return scale(min, max, value);
+      return Cluster.scale(cluster.minRegionSkewByTable, 
cluster.maxRegionSkewByTable, cluster.regionSkewByTable);
     }
   }
 
@@ -1366,7 +1329,7 @@ public class StochasticLoadBalancer extends 
BaseLoadBalancer {
       for (int i = 0 ; i < costsPerGroup.length; i++) {
         totalCost += costsPerGroup[i];
       }
-      return scale(0, maxCost, totalCost);
+      return Cluster.scale(0, maxCost, totalCost);
     }
 
     /**
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
index fdac676..f82a96a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java
@@ -390,8 +390,8 @@ public class TestBaseLoadBalancer extends BalancerTestBase {
 
     // now move region1 from servers[0] to servers[2]
     cluster.doAction(new MoveRegionAction(0, 0, 2));
-    // check that the numMaxRegionsPerTable for "table" has increased to 2
-    assertEquals(2, cluster.numMaxRegionsPerTable[0]);
+    // check that the regionSkewByTable for "table" has increased to 2
+    assertEquals(2, cluster.regionSkewByTable, 0.01);
     // now repeat check whether moving region1 from servers[1] to servers[2]
     // would lower availability
     assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));

Reply via email to