This is an automated email from the ASF dual-hosted git repository. ndimiduk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push: new 533c84d HBASE-25739 TableSkewCostFunction need to use aggregated deviation (#3067) 533c84d is described below commit 533c84d330f7314cca4f2f14b4cf6b8454eafb0a Author: clarax <clarax98...@gmail.com> AuthorDate: Thu Apr 15 13:12:07 2021 -0700 HBASE-25739 TableSkewCostFunction need to use aggregated deviation (#3067) Signed-off-by: Michael Stack <st...@apache.org> Reviewed-by: David Manning <david.mann...@salesforce.com> --- .../hbase/master/balancer/BaseLoadBalancer.java | 120 +++++++++++++++++---- .../master/balancer/StochasticLoadBalancer.java | 101 ++++++----------- .../master/balancer/TestBaseLoadBalancer.java | 4 +- 3 files changed, 131 insertions(+), 94 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java index 1466570..b411636 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java @@ -162,7 +162,12 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state) int[] regionIndexToTableIndex; //regionIndex -> tableIndex int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions - int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS + int[] numRegionsPerTable; // tableIndex -> region count + double[] meanRegionsPerTable; // mean region count per table + double regionSkewByTable; // skew on RS per by table + double minRegionSkewByTable; // min skew on RS per by table + double maxRegionSkewByTable; // max skew on RS per by table + int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary boolean hasRegionReplicas = false; //whether there is regions with replicas @@ -370,6 +375,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { numTables = tables.size(); numRegionsPerServerPerTable = new int[numServers][numTables]; + numRegionsPerTable = new int[numTables]; for (int i = 0; i < numServers; i++) { for (int j = 0; j < numTables; j++) { @@ -377,21 +383,29 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } } + for (int i = 0; i < numTables; i++) { + numRegionsPerTable[i] = 0; + } + for (int i=0; i < regionIndexToServerIndex.length; i++) { if (regionIndexToServerIndex[i] >= 0) { numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++; + numRegionsPerTable[regionIndexToTableIndex[i]]++; } } - numMaxRegionsPerTable = new int[numTables]; - for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { - for (tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { - if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; - } - } + // Avoid repeated computation for planning + meanRegionsPerTable = new double[numTables]; + maxRegionSkewByTable = 0; + minRegionSkewByTable = 0; + for (int i = 0; i < numTables; i++) { + meanRegionsPerTable[i] = Double.valueOf(numRegionsPerTable[i]) / numServers; + minRegionSkewByTable += Cluster.getMinSkew(numRegionsPerTable[i], numServers); + maxRegionSkewByTable += Cluster.getMaxSkew(numRegionsPerTable[i], numServers); } + computeRegionSkewPerTable(); + for (int i = 0; i < regions.length; i ++) { RegionInfo info = regions[i]; if (RegionReplicaUtil.isDefaultReplica(info)) { @@ -517,6 +531,53 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } /** + * Return the min skew of distribution + */ + public static double getMinSkew(double total, double numServers) { + double mean = total / numServers; + // It's possible that there aren't enough regions to go around + double min; + if (numServers > total) { + min = ((numServers - total) * mean + (1 - mean) * total) ; + } else { + // Some will have 1 more than everything else. + int numHigh = (int) (total - (Math.floor(mean) * numServers)); + int numLow = (int) (numServers - numHigh); + min = numHigh * (Math.ceil(mean) - mean) + numLow * (mean - Math.floor(mean)); + } + return min; + } + + /** + * Return the max deviation of distribution + * Compute max as if all region servers had 0 and one had the sum of all costs. This must be + * a zero sum cost for this to make sense. + */ + public static double getMaxSkew(double total, double numServers) { + double mean = total / numServers; + return (total - mean) + (numServers - 1) * mean; + } + + /** + * Scale the value between 0 and 1. + * + * @param min Min value + * @param max The Max value + * @param value The value to be scaled. + * @return The scaled value. + */ + public static double scale(double min, double max, double value) { + if (max <= min || value <= min) { + return 0; + } + if ((max - min) == 0) { + return 0; + } + + return Math.max(0d, Math.min(1d, (value - min) / (max - min))); + } + + /** * Retrieves and lazily initializes a field storing the locality of * every region/server combination */ @@ -574,6 +635,21 @@ public abstract class BaseLoadBalancer implements LoadBalancer { } /** + * Recompute the region skew during init or plan of moves. + */ + private void computeRegionSkewPerTable() { + // reinitialize for recomputation + regionSkewByTable = 0; + + for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { + for (int tableIndex = 0; tableIndex < aNumRegionsPerServerPerTable.length; tableIndex++) { + regionSkewByTable += Math.abs(aNumRegionsPerServerPerTable[tableIndex] + - meanRegionsPerTable[tableIndex]); + } + } + } + + /** * Computes and caches the locality for each region/rack combinations, * as well as storing a mapping of region -> server and region -> rack such that server * and rack have the highest locality for region @@ -828,22 +904,20 @@ public abstract class BaseLoadBalancer implements LoadBalancer { int tableIndex = regionIndexToTableIndex[region]; if (oldServer >= 0) { numRegionsPerServerPerTable[oldServer][tableIndex]--; + // update regionSkewPerTable for the move from old server + regionSkewByTable += + Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex] + - meanRegionsPerTable[tableIndex]) + - Math.abs(numRegionsPerServerPerTable[oldServer][tableIndex] + 1 + - meanRegionsPerTable[tableIndex]); } numRegionsPerServerPerTable[newServer][tableIndex]++; - - //check whether this caused maxRegionsPerTable in the new Server to be updated - if (numRegionsPerServerPerTable[newServer][tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numMaxRegionsPerTable[tableIndex] = numRegionsPerServerPerTable[newServer][tableIndex]; - } else if (oldServer >= 0 && (numRegionsPerServerPerTable[oldServer][tableIndex] + 1) - == numMaxRegionsPerTable[tableIndex]) { - //recompute maxRegionsPerTable since the previous value was coming from the old server - numMaxRegionsPerTable[tableIndex] = 0; - for (int[] aNumRegionsPerServerPerTable : numRegionsPerServerPerTable) { - if (aNumRegionsPerServerPerTable[tableIndex] > numMaxRegionsPerTable[tableIndex]) { - numMaxRegionsPerTable[tableIndex] = aNumRegionsPerServerPerTable[tableIndex]; - } - } - } + // update regionSkewPerTable for the move to new server + regionSkewByTable += + Math.abs(numRegionsPerServerPerTable[newServer][tableIndex] + - meanRegionsPerTable[tableIndex]) + - Math.abs(numRegionsPerServerPerTable[newServer][tableIndex] - 1 + - meanRegionsPerTable[tableIndex]); // update for servers int primary = regionIndexToPrimaryIndex[region]; @@ -1013,7 +1087,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer { .append(Arrays.toString(serverIndicesSortedByRegionCount)) .append(", regionsPerServer=").append(Arrays.deepToString(regionsPerServer)); - desc.append(", numMaxRegionsPerTable=").append(Arrays.toString(numMaxRegionsPerTable)) + desc.append(", regionSkewByTable=").append(regionSkewByTable) .append(", numRegions=").append(numRegions).append(", numServers=").append(numServers) .append(", numTables=").append(numTables).append(", numMovedRegions=") .append(numMovedRegions).append('}'); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java index b40779a..e8a7788 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java @@ -762,6 +762,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { boolean isNeeded() { return true; } + float getMultiplier() { return multiplier; } @@ -770,35 +771,39 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { this.multiplier = m; } - /** Called once per LB invocation to give the cost function + /** + * Called once per LB invocation to give the cost function * to initialize it's state, and perform any costly calculation. */ void init(Cluster cluster) { this.cluster = cluster; } - /** Called once per cluster Action to give the cost function + /** + * Called once per cluster Action to give the cost function * an opportunity to update it's state. postAction() is always * called at least once before cost() is called with the cluster - * that this action is performed on. */ + * that this action is performed on. + */ void postAction(Action action) { switch (action.type) { - case NULL: break; - case ASSIGN_REGION: - AssignRegionAction ar = (AssignRegionAction) action; - regionMoved(ar.region, -1, ar.server); - break; - case MOVE_REGION: - MoveRegionAction mra = (MoveRegionAction) action; - regionMoved(mra.region, mra.fromServer, mra.toServer); - break; - case SWAP_REGIONS: - SwapRegionsAction a = (SwapRegionsAction) action; - regionMoved(a.fromRegion, a.fromServer, a.toServer); - regionMoved(a.toRegion, a.toServer, a.fromServer); - break; - default: - throw new RuntimeException("Uknown action:" + action.type); + case NULL: + break; + case ASSIGN_REGION: + AssignRegionAction ar = (AssignRegionAction) action; + regionMoved(ar.region, -1, ar.server); + break; + case MOVE_REGION: + MoveRegionAction mra = (MoveRegionAction) action; + regionMoved(mra.region, mra.fromServer, mra.toServer); + break; + case SWAP_REGIONS: + SwapRegionsAction a = (SwapRegionsAction) action; + regionMoved(a.fromRegion, a.fromServer, a.toServer); + regionMoved(a.toRegion, a.toServer, a.fromServer); + break; + default: + throw new RuntimeException("Uknown action:" + action.type); } } @@ -822,59 +827,25 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { double total = getSum(stats); double count = stats.length; - double mean = total/count; - - // Compute max as if all region servers had 0 and one had the sum of all costs. This must be - // a zero sum cost for this to make sense. - double max = ((count - 1) * mean) + (total - mean); - - // It's possible that there aren't enough regions to go around - double min; - if (count > total) { - min = ((count - total) * mean) + ((1 - mean) * total); - } else { - // Some will have 1 more than everything else. - int numHigh = (int) (total - (Math.floor(mean) * count)); - int numLow = (int) (count - numHigh); - - min = (numHigh * (Math.ceil(mean) - mean)) + (numLow * (mean - Math.floor(mean))); + double mean = total / count; - } - min = Math.max(0, min); - for (int i=0; i<stats.length; i++) { + for (int i = 0; i < stats.length; i++) { double n = stats[i]; double diff = Math.abs(mean - n); totalCost += diff; } - double scaled = scale(min, max, totalCost); - return scaled; + return Cluster + .scale(Cluster.getMinSkew(total, count), Cluster.getMaxSkew(total, count), totalCost); } private double getSum(double[] stats) { double total = 0; - for(double s:stats) { + for (double s : stats) { total += s; } return total; } - - /** - * Scale the value between 0 and 1. - * - * @param min Min value - * @param max The Max value - * @param value The value to be scaled. - * @return The scaled value. - */ - protected double scale(double min, double max, double value) { - if (max <= min || value <= min) { - return 0; - } - if ((max - min) == 0) return 0; - - return Math.max(0d, Math.min(1d, (value - min) / (max - min))); - } } /** @@ -927,7 +898,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { return 1000000; // return a number much greater than any of the other cost } - return scale(0, Math.min(cluster.numRegions, maxMoves), moveCost); + return Cluster.scale(0, Math.min(cluster.numRegions, maxMoves), moveCost); } } @@ -1035,15 +1006,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { @Override protected double cost() { - double max = cluster.numRegions; - double min = ((double) cluster.numRegions) / cluster.numServers; - double value = 0; - - for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) { - value += cluster.numMaxRegionsPerTable[i]; - } - - return scale(min, max, value); + return Cluster.scale(cluster.minRegionSkewByTable, cluster.maxRegionSkewByTable, cluster.regionSkewByTable); } } @@ -1366,7 +1329,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer { for (int i = 0 ; i < costsPerGroup.length; i++) { totalCost += costsPerGroup[i]; } - return scale(0, maxCost, totalCost); + return Cluster.scale(0, maxCost, totalCost); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java index fdac676..f82a96a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestBaseLoadBalancer.java @@ -390,8 +390,8 @@ public class TestBaseLoadBalancer extends BalancerTestBase { // now move region1 from servers[0] to servers[2] cluster.doAction(new MoveRegionAction(0, 0, 2)); - // check that the numMaxRegionsPerTable for "table" has increased to 2 - assertEquals(2, cluster.numMaxRegionsPerTable[0]); + // check that the regionSkewByTable for "table" has increased to 2 + assertEquals(2, cluster.regionSkewByTable, 0.01); // now repeat check whether moving region1 from servers[1] to servers[2] // would lower availability assertTrue(cluster.wouldLowerAvailability(hri1, servers[2]));