http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java new file mode 100644 index 0000000..9d3968b --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/BalancingStrategy.java @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import java.text.MessageFormat; +import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.location.Location; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Represents an abstract algorithm for optimally balancing worker "items" among several "containers" based on the workloads + * of the items, and corresponding high- and low-thresholds on the containers. + * + * TODO: extract interface, provide default implementation + * TODO: remove legacy code comments + */ +public class BalancingStrategy<NodeType extends Entity, ItemType extends Movable> { + + // FIXME Bad idea to use MessageFormat.format in this way; if toString of entity contains + // special characters interpreted by MessageFormat, then it will all break! + + // This is a modified version of the watermark elasticity policy from Monterey v3. + + private static final Logger LOG = LoggerFactory.getLogger(BalancingStrategy.class); + + private static final int MAX_MIGRATIONS_PER_BALANCING_NODE = 20; // arbitrary (Splodge) + private static final boolean BALANCE_COLD_PULLS_IN_SAME_RUN_AS_HOT_PUSHES = false; + + private final String name; + private final BalanceablePoolModel<NodeType, ItemType> model; + private final PolicyUtilForPool<NodeType, ItemType> helper; +// private boolean loggedColdestTooHigh = false; +// private boolean loggedHottestTooLow = false; + + + public BalancingStrategy(String name, BalanceablePoolModel<NodeType, ItemType> model) { + this.name = name; + this.model = model; + this.helper = new PolicyUtilForPool<NodeType, ItemType>(model); + } + + public String getName() { + return name; + } + + public void rebalance() { + checkAndApplyOn(model.getPoolContents()); + } + + public int getMaxMigrationsPerBalancingNode() { + return MAX_MIGRATIONS_PER_BALANCING_NODE; + } + + public BalanceablePoolModel<NodeType, ItemType> getDataProvider() { + return model; + } + + // This was the entry point for the legacy policy. + private void checkAndApplyOn(final Collection<NodeType> dirtyNodesSupplied) { + Collection<NodeType> dirtyNodes = dirtyNodesSupplied; + +// if (startTime + FORCE_ALL_NODES_IF_DELAYED_FOR_MILLIS < System.currentTimeMillis()) { +// Set<NodeType> allNodes = new LinkedHashSet<NodeType>(); +// allNodes.addAll(dirtyNodes); +// dirtyNodes = allNodes; +// allNodes.addAll(getDataProvider().getPoolContents()); +// if (LOG.isDebugEnabled()) +// LOG.debug("policy "+getDataProvider().getAbbr()+" running after delay ("+ +// TimeUtils.makeTimeString(System.currentTimeMillis() - startTime)+", analysing all nodes: "+ +// dirtyNodes); +// } + +// nodeFinder.optionalCachedNodesWithBacklogDetected.clear(); +// boolean gonnaGrow = growPool(dirtyNodes); +// getDataProvider().waitForAllTransitionsComplete(); + boolean gonnaGrow = false; + + Set<NodeType> nonFrozenDirtyNodes = new LinkedHashSet<NodeType>(dirtyNodes); +// boolean gonnaShrink = false; +// if (!gonnaGrow && !DONT_SHRINK_UNLESS_BALANCED) { +// gonnaShrink = shrinkPool(nonFrozenDirtyNodes); +// getDataProvider().waitForAllTransitionsComplete(); +// } + + if (getDataProvider().getPoolSize() >= 2) { + boolean didBalancing = false; + for (NodeType a : nonFrozenDirtyNodes) { + didBalancing |= balanceItemsOnNodesInQuestion(a, gonnaGrow); +// getMutator().waitForAllTransitionsComplete(); + } + if (didBalancing) { + return; + } + } + +// if (!gonnaGrow && DONT_SHRINK_UNLESS_BALANCED) { +// gonnaShrink = shrinkPool(nonFrozenDirtyNodes); +// getDataProvider().waitForAllTransitionsComplete(); +// } + +// if (gonnaGrow || gonnaShrink) +// //don't log 'doing nothing' message +// return; + +// if (LOG.isDebugEnabled()) { +// double poolTotal = getDataProvider().getPoolPredictedWorkrateTotal(); +// int poolSize = getDataProvider().getPoolPredictedSize(); +// LOG.debug(MessageFormat.format("policy "+getDataProvider().getAbbr()+" did nothing; pool workrate is {0,number,#.##} x {1} nodes", +// 1.0*poolTotal/poolSize, poolSize)); +// } + } + + protected boolean balanceItemsOnNodesInQuestion(NodeType questionedNode, boolean gonnaGrow) { + double questionedNodeTotalWorkrate = getDataProvider().getTotalWorkrate(questionedNode); + + boolean balanced = balanceItemsOnHotNode(questionedNode, questionedNodeTotalWorkrate, gonnaGrow); +// getMutator().waitForAllTransitionsComplete(); + + if (!balanced || BALANCE_COLD_PULLS_IN_SAME_RUN_AS_HOT_PUSHES) { + balanced |= balanceItemsOnColdNode(questionedNode, questionedNodeTotalWorkrate, gonnaGrow); +// getMutator().waitForAllTransitionsComplete(); + } + if (balanced) + return true; + + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" not balancing "+questionedNode+"; " + + "its workrate {0,number,#.##} is acceptable (or cannot be balanced)", questionedNodeTotalWorkrate) ); + } + return false; + } + + protected boolean balanceItemsOnHotNode(NodeType node, double nodeWorkrate, boolean gonnaGrow) { + double originalNodeWorkrate = nodeWorkrate; + int migrationCount = 0; + int iterationCount = 0; + + Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>(); + Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>(); + +// if (nodeFinder.config.COUNT_BACKLOG_AS_EXTRA_WORKRATE) { +// int backlog = nodeFinder.getBacklogQueueLength(questionedNode); +// if (backlog>0) { +// Level l = backlog>1000 ? Level.INFO : backlog>10 ? Level.FINE : Level.FINER; +// if (LOG.isLoggable(l)) { +// LOG.log( l, MessageFormat.format( +// "policy "+getDataProvider().getAbbr()+" detected queue at node "+questionedNode+", " + +// "inflating workrate {0,number,#.##} + "+backlog, questionedNodeTotalWorkrate) ); +// } +// questionedNodeTotalWorkrate += backlog; +// } +// } + + Double highThreshold = model.getHighThreshold(node); + if (highThreshold == -1) { + // node presumably has been removed; TODO log + return false; + } + + while (nodeWorkrate > highThreshold && migrationCount < getMaxMigrationsPerBalancingNode()) { + iterationCount++; + + if (LOG.isDebugEnabled()) { + LOG.debug(MessageFormat.format( + "policy "+getDataProvider().getName()+" considering balancing hot node "+node+" " + + "(workrate {0,number,#.##}); iteration "+iterationCount, nodeWorkrate) ); + } + + // move from hot node, to coldest + + NodeType coldNode = helper.findColdestContainer(nodesChecked); + + if (coldNode == null) { + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" not balancing hot node "+node+" " + + "(workrate {0,number,#.##}); no coldest node available", nodeWorkrate) ); + } + break; + } + + if (coldNode.equals(node)) { + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" not balancing hot node "+node+" " + + "(workrate {0,number,#.##}); it is also the coldest modifiable node", nodeWorkrate) ); + } + break; + } + + double coldNodeWorkrate = getDataProvider().getTotalWorkrate(coldNode); + boolean emergencyLoadBalancing = coldNodeWorkrate < nodeWorkrate*2/3; + double coldNodeHighThreshold = model.getHighThreshold(coldNode); + if (coldNodeWorkrate >= coldNodeHighThreshold && !emergencyLoadBalancing) { + //don't balance if all nodes are approx equally hot (and very hot) + + //for now, stop warning if it is a recurring theme! +// Level level = loggedColdestTooHigh ? Level.FINER : Level.INFO; +// LOG.log(level, MessageFormat.format( +// "policy "+getDataProvider().getAbbr()+" not balancing hot node "+questionedNode+" " + +// "(workrate {0,number,#.##}); coldest node "+coldNode+" has workrate {1,number,#.##} also too high"+ +// (loggedColdestTooHigh ? "" : " (future cases will be logged at finer)"), +// questionedNodeTotalWorkrate, coldNodeWorkrate) ); +// loggedColdestTooHigh = true; + break; + } + double poolLowWatermark = Double.MAX_VALUE; // TODO + if (gonnaGrow && (coldNodeWorkrate >= poolLowWatermark && !emergencyLoadBalancing)) { + //if we're growing the pool, refuse to balance unless the cold node is indeed very cold, or hot node very hot + + //for now, stop warning if it is a recurring theme! +// Level level = loggedColdestTooHigh ? Level.FINER : Level.INFO; +// LOG.log(level, MessageFormat.format( +// "policy "+getDataProvider().getAbbr()+" not balancing hot node "+questionedNode+" " + +// "(workrate {0,number,#.##}); coldest node "+coldNode+" has workrate {1,number,#.##} also too high to accept while pool is growing" + +// (loggedColdestTooHigh ? "" : " (future cases will be logged at finer)"), +// questionedNodeTotalWorkrate, coldNodeWorkrate) ); +// loggedColdestTooHigh = true; + break; + } + + String questionedNodeName = getDataProvider().getName(node); + String coldNodeName = getDataProvider().getName(coldNode); + Location coldNodeLocation = getDataProvider().getLocation(coldNode); + + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " + + "("+node+", workrate {0,number,#.##}), " + + "considering target "+coldNodeName+" ("+coldNode+", workrate {1,number,#.##})", + nodeWorkrate, coldNodeWorkrate) ); + } + + double idealSizeToMove = (nodeWorkrate - coldNodeWorkrate) / 2; + //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount + + if (idealSizeToMove + coldNodeWorkrate > coldNodeHighThreshold) + idealSizeToMove = coldNodeHighThreshold - coldNodeWorkrate; + + + double maxSizeToMoveIdeally = Math.min( + nodeWorkrate/2 + 0.00001, + //permit it to exceed node high if there is no alternative (this is 'max' not 'ideal'), + //so long as it still gives a significant benefit + // getConfiguration().nodeHighWaterMark - coldNodeWorkrate, + (nodeWorkrate - coldNodeWorkrate)*0.9); + double maxSizeToMoveIfNoSmallButLarger = nodeWorkrate*3/4; + + Map<ItemType, Double> questionedNodeItems = getDataProvider().getItemWorkrates(node); + if (questionedNodeItems == null) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " + + "("+node+", workrate {0,number,#.##}), abandoned; " + + "item report for " + questionedNodeName + " unavailable", + nodeWorkrate)); + break; + } + ItemType itemToMove = findBestItemToMove(questionedNodeItems, idealSizeToMove, maxSizeToMoveIdeally, + maxSizeToMoveIfNoSmallButLarger, itemsMoved, coldNodeLocation); + + if (itemToMove == null) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing hot node "+questionedNodeName+" " + + "("+node+", workrate {0,number,#.##}), ending; " + + "no suitable segment found " + + "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " + + "moving to coldest node "+coldNodeName+" ("+coldNode+", workrate {3,number,#.##}); available items: {4}", + nodeWorkrate, idealSizeToMove, maxSizeToMoveIdeally, coldNodeWorkrate, questionedNodeItems) ); + break; + } + + itemsMoved.add(itemToMove); + double itemWorkrate = questionedNodeItems.get(itemToMove); + +// if (LOG.isLoggable(Level.FINE)) +// LOG.fine( MessageFormat.format( +// "policy "+getDataProvider().getAbbr()+" balancing hot node "+questionedNodeName+" " + +// "(workrate {0,number,#.##}, too high), transitioning " + itemToMove + +// " to "+coldNodeName+" (workrate {1,number,#.##}, now += {2,number,#.##})", +// questionedNodeTotalWorkrate, coldNodeWorkrate, segmentRate) ); + + nodeWorkrate -= itemWorkrate; + coldNodeWorkrate += itemWorkrate; + + moveItem(itemToMove, node, coldNode); + ++migrationCount; + } + + if (LOG.isDebugEnabled()) { + if (iterationCount == 0) { + if (LOG.isTraceEnabled()) + LOG.trace( MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing if hot finished at node "+node+"; " + + "workrate {0,number,#.##} not hot", + originalNodeWorkrate) ); + } + else if (itemsMoved.isEmpty()) { + if (LOG.isTraceEnabled()) + LOG.trace( MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing finished at hot node "+node+" " + + "(workrate {0,number,#.##}); no way to improve it", + originalNodeWorkrate) ); + } else { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing finished at hot node "+node+"; " + + "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " + + "by moving off {3}", + originalNodeWorkrate, + nodeWorkrate, + getDataProvider().getTotalWorkrate(node), + itemsMoved + ) ); + } + } + return !itemsMoved.isEmpty(); + } + + protected boolean balanceItemsOnColdNode(NodeType questionedNode, double questionedNodeTotalWorkrate, boolean gonnaGrow) { + // Abort if the node has pending adjustments. + Map<ItemType, Double> items = getDataProvider().getItemWorkrates(questionedNode); + if (items == null) { + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " + + "(workrate {0,number,#.##}); workrate breakdown unavailable (probably reverting)", + questionedNodeTotalWorkrate) ); + } + return false; + } + for (ItemType item : items.keySet()) { + if (!model.isItemMoveable(item)) { + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " + + "(workrate {0,number,#.##}); at least one item ("+item+") is in flux", + questionedNodeTotalWorkrate) ); + } + return false; + } + } + + double originalQuestionedNodeTotalWorkrate = questionedNodeTotalWorkrate; + int numMigrations = 0; + + Set<ItemType> itemsMoved = new LinkedHashSet<ItemType>(); + Set<NodeType> nodesChecked = new LinkedHashSet<NodeType>(); + + int iters = 0; + Location questionedLocation = getDataProvider().getLocation(questionedNode); + + double lowThreshold = model.getLowThreshold(questionedNode); + while (questionedNodeTotalWorkrate < lowThreshold) { + iters++; + + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" considering balancing cold node "+questionedNode+" " + + "(workrate {0,number,#.##}); iteration "+iters, questionedNodeTotalWorkrate)); + } + + // move from cold node, to hottest + + NodeType hotNode = helper.findHottestContainer(nodesChecked); + + if (hotNode == null) { + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " + + "(workrate {0,number,#.##}); no hottest node available", questionedNodeTotalWorkrate) ); + } + + break; + } + if (hotNode.equals(questionedNode)) { + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" not balancing cold node "+questionedNode+" " + + "(workrate {0,number,#.##}); it is also the hottest modfiable node", questionedNodeTotalWorkrate) ); + } + break; + } + + + double hotNodeWorkrate = getDataProvider().getTotalWorkrate(hotNode); + double hotNodeLowThreshold = model.getLowThreshold(hotNode); + double hotNodeHighThreshold = model.getHighThreshold(hotNode); + boolean emergencyLoadBalancing = false; //doesn't apply to cold + if (hotNodeWorkrate == -1 || hotNodeLowThreshold == -1 || hotNodeHighThreshold == -1) { + // hotNode presumably has been removed; TODO log + break; + } + if (hotNodeWorkrate <= hotNodeLowThreshold && !emergencyLoadBalancing) { + //don't balance if all nodes are too low + + //for now, stop warning if it is a recurring theme! +// Level level = loggedHottestTooLow ? Level.FINER : Level.INFO; +// LOG.log(level, MessageFormat.format( +// "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " + +// "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low" + +// (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"), +// questionedNodeTotalWorkrate, hotNodeWorkrate) ); +// loggedHottestTooLow = true; + break; + } + if (gonnaGrow && (hotNodeWorkrate <= hotNodeHighThreshold && !emergencyLoadBalancing)) { + //if we're growing the pool, refuse to balance unless the hot node is quite hot + + //again, stop warning if it is a recurring theme! +// Level level = loggedHottestTooLow ? Level.FINER : Level.INFO; +// LOG.log(level, MessageFormat.format( +// "policy "+getDataProvider().getAbbr()+" not balancing cold node "+questionedNode+" " + +// "(workrate {0,number,#.##}); hottest node "+hotNode+" has workrate {1,number,#.##} also too low to accept while pool is growing"+ +// (loggedHottestTooLow ? "" : " (future cases will be logged at finer)"), +// questionedNodeTotalWorkrate, hotNodeWorkrate) ); +// loggedHottestTooLow = true; + break; + } + + String questionedNodeName = getDataProvider().getName(questionedNode); + String hotNodeName = getDataProvider().getName(hotNode); + + if (LOG.isDebugEnabled()) { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " + + "("+questionedNode+", workrate {0,number,#.##}), " + + "considering source "+hotNodeName+" ("+hotNode+", workrate {1,number,#.##})", + questionedNodeTotalWorkrate, hotNodeWorkrate) ); + } + + double idealSizeToMove = (hotNodeWorkrate - questionedNodeTotalWorkrate) / 2; + //if the 'ideal' amount to move would cause cold to be too hot, then reduce ideal amount + double targetNodeHighThreshold = model.getHighThreshold(questionedNode); + if (idealSizeToMove + questionedNodeTotalWorkrate > targetNodeHighThreshold) + idealSizeToMove = targetNodeHighThreshold - questionedNodeTotalWorkrate; + double maxSizeToMoveIdeally = Math.min( + hotNodeWorkrate/2, + //allow to swap order, but not very much (0.9 was allowed when balancing high) + (hotNodeWorkrate - questionedNodeTotalWorkrate)*0.6); + double maxSizeToMoveIfNoSmallButLarger = questionedNodeTotalWorkrate*3/4; + + Map<ItemType, Double> hotNodeItems = getDataProvider().getItemWorkrates(hotNode); + if (hotNodeItems == null) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " + + "("+questionedNode+", workrate {0,number,#.##}), " + + "excluding hot node "+hotNodeName+" because its item report unavailable", + questionedNodeTotalWorkrate)); + nodesChecked.add(hotNode); + continue; + } + + ItemType itemToMove = findBestItemToMove(hotNodeItems, idealSizeToMove, maxSizeToMoveIdeally, + maxSizeToMoveIfNoSmallButLarger, itemsMoved, questionedLocation); + if (itemToMove == null) { + if (LOG.isDebugEnabled()) + LOG.debug(MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing cold node "+questionedNodeName+" " + + "("+questionedNode+", workrate {0,number,#.##}), " + + "excluding hot node "+hotNodeName+" because it has no appilcable items " + + "(ideal transition item size {1,number,#.##}, max {2,number,#.##}, " + + "moving from hot node "+hotNodeName+" ("+hotNode+", workrate {3,number,#.##}); available items: {4}", + questionedNodeTotalWorkrate, idealSizeToMove, maxSizeToMoveIdeally, hotNodeWorkrate, hotNodeItems) ); + + nodesChecked.add(hotNode); + continue; + } + + itemsMoved.add(itemToMove); + double segmentRate = hotNodeItems.get(itemToMove); + +// if (LOG.isLoggable(Level.FINE)) +// LOG.fine( MessageFormat.format( +// "policy "+getDataProvider().getAbbr()+" balancing cold node "+questionedNodeName+" " + +// "(workrate {0,number,#.##}, too low), transitioning " + itemToMove + +// " from "+hotNodeName+" (workrate {1,number,#.##}, now -= {2,number,#.##})", +// questionedNodeTotalWorkrate, hotNodeWorkrate, segmentRate) ); + + questionedNodeTotalWorkrate += segmentRate; + hotNodeWorkrate -= segmentRate; + + moveItem(itemToMove, hotNode, questionedNode); + + if (++numMigrations >= getMaxMigrationsPerBalancingNode()) { + break; + } + } + + if (LOG.isDebugEnabled()) { + if (iters == 0) { + if (LOG.isTraceEnabled()) + LOG.trace( MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing if cold finished at node "+questionedNode+"; " + + "workrate {0,number,#.##} not cold", + originalQuestionedNodeTotalWorkrate) ); + } + else if (itemsMoved.isEmpty()) { + if (LOG.isTraceEnabled()) + LOG.trace( MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+" " + + "(workrate {0,number,#.##}); no way to improve it", + originalQuestionedNodeTotalWorkrate) ); + } else { + LOG.debug( MessageFormat.format( + "policy "+getDataProvider().getName()+" balancing finished at cold node "+questionedNode+"; " + + "workrate from {0,number,#.##} to {1,number,#.##} (report now says {2,number,#.##}) " + + "by moving in {3}", + originalQuestionedNodeTotalWorkrate, + questionedNodeTotalWorkrate, + getDataProvider().getTotalWorkrate(questionedNode), + itemsMoved) ); + } + } + return !itemsMoved.isEmpty(); + } + + protected void moveItem(ItemType item, NodeType oldNode, NodeType newNode) { + item.move(newNode); + model.onItemMoved(item, newNode); + } + + /** + * "Best" is defined as nearest to the targetCost, without exceeding maxCost, unless maxCostIfNothingSmallerButLarger > 0 + * which does just that (useful if the ideal and target are estimates and aren't quite right, typically it will take + * something larger than maxRate but less than half the total rate, which is only possible when the estimates don't agree) + */ + protected ItemType findBestItemToMove(Map<ItemType, Double> costsPerItem, double targetCost, double maxCost, + double maxCostIfNothingSmallerButLarger, Set<ItemType> excludedItems, Location locationIfKnown) { + + ItemType closestMatch = null; + ItemType smallestMoveable = null, largest = null; + double minDiff = Double.MAX_VALUE, smallestC = Double.MAX_VALUE, largestC = Double.MIN_VALUE; + boolean exclusions = false; + + for (Entry<ItemType, Double> entry : costsPerItem.entrySet()) { + ItemType item = entry.getKey(); + Double cost = entry.getValue(); + + if (cost == null) { + if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' has null workrate: skipping", item)); + continue; + } + + if (!model.isItemMoveable(item)) { + if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' cannot be moved: skipping", item)); + continue; + } + if (cost < 0) { + if (LOG.isDebugEnabled()) LOG.debug(MessageFormat.format("Item ''{0}'' subject to recent adjustment: skipping", item)); + continue; + } + if (excludedItems.contains(item)) { + exclusions = true; + continue; + } + if (cost < 0) { // FIXME: already tested above + exclusions = true; + continue; + } + if (cost <= 0) { // FIXME: overlaps previous condition + continue; + } + if (largest == null || cost > largestC) { + largest = item; + largestC = cost; + } + if (!model.isItemMoveable(item)) { // FIXME: already tested above + continue; + } + if (locationIfKnown != null && !model.isItemAllowedIn(item, locationIfKnown)) { + continue; + } + if (smallestMoveable == null || cost < smallestC) { + smallestMoveable = item; + smallestC = cost; + } + if (cost > maxCost) { + continue; + } + double diff = Math.abs(targetCost - cost); + if (closestMatch == null || diff < minDiff) { + closestMatch = item; + minDiff = diff; + } + } + + if (closestMatch != null) + return closestMatch; + + if (smallestC < maxCostIfNothingSmallerButLarger && smallestC < largestC && !exclusions) + return smallestMoveable; + + return null; + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java new file mode 100644 index 0000000..5f6cabc --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/DefaultBalanceablePoolModel.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.brooklyn.api.location.Location; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; + +/** + * Standard implementation of {@link BalanceablePoolModel}, providing essential arithmetic for item and container + * workrates and thresholds. See subclasses for specific requirements for migrating items. + */ +public class DefaultBalanceablePoolModel<ContainerType, ItemType> implements BalanceablePoolModel<ContainerType, ItemType> { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultBalanceablePoolModel.class); + + /* + * Performance comments. + * - Used hprof with LoadBalancingPolicySoakTest.testLoadBalancingManyManyItemsTest (1000 items) + * - Prior to adding containerToItems, it created a new set by iterating over all items. + * This was the biggest percentage of any brooklyn code. + * Hence it's worth duplicating the values, keyed by item and keyed by container. + * - Unfortunately changing threading model (so have a "rebalancer" thread, and a thread that + * processes events to update the model), get ConcurrentModificationException if don't take + * copy of containerToItems.get(node)... + */ + + // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item + private static final String NULL_CONTAINER = "null-container"; + + private final String name; + private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>()); + private final Map<ContainerType, Double> containerToLowThreshold = new ConcurrentHashMap<ContainerType, Double>(); + private final Map<ContainerType, Double> containerToHighThreshold = new ConcurrentHashMap<ContainerType, Double>(); + private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>(); + private final SetMultimap<ContainerType, ItemType> containerToItems = Multimaps.synchronizedSetMultimap(HashMultimap.<ContainerType, ItemType>create()); + private final Map<ItemType, Double> itemToWorkrate = new ConcurrentHashMap<ItemType, Double>(); + private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>()); + + private volatile double poolLowThreshold = 0; + private volatile double poolHighThreshold = 0; + private volatile double currentPoolWorkrate = 0; + + public DefaultBalanceablePoolModel(String name) { + this.name = name; + } + + public ContainerType getParentContainer(ItemType item) { + ContainerType result = itemToContainer.get(item); + return (result != NULL_CONTAINER) ? result : null; + } + + public Set<ItemType> getItemsForContainer(ContainerType node) { + Set<ItemType> result = containerToItems.get(node); + synchronized (containerToItems) { + return (result != null) ? ImmutableSet.copyOf(result) : Collections.<ItemType>emptySet(); + } + } + + public Double getItemWorkrate(ItemType item) { + return itemToWorkrate.get(item); + } + + @Override public double getPoolLowThreshold() { return poolLowThreshold; } + @Override public double getPoolHighThreshold() { return poolHighThreshold; } + @Override public double getCurrentPoolWorkrate() { return currentPoolWorkrate; } + @Override public boolean isHot() { return !containers.isEmpty() && currentPoolWorkrate > poolHighThreshold; } + @Override public boolean isCold() { return !containers.isEmpty() && currentPoolWorkrate < poolLowThreshold; } + + + // Provider methods. + + @Override public String getName() { return name; } + @Override public int getPoolSize() { return containers.size(); } + @Override public Set<ContainerType> getPoolContents() { return containers; } + @Override public String getName(ContainerType container) { return container.toString(); } // TODO: delete? + @Override public Location getLocation(ContainerType container) { return null; } // TODO? + + @Override public double getLowThreshold(ContainerType container) { + Double result = containerToLowThreshold.get(container); + return (result != null) ? result : -1; + } + + @Override public double getHighThreshold(ContainerType container) { + Double result = containerToHighThreshold.get(container); + return (result != null) ? result : -1; + } + + @Override public double getTotalWorkrate(ContainerType container) { + double totalWorkrate = 0; + for (ItemType item : getItemsForContainer(container)) { + Double workrate = itemToWorkrate.get(item); + if (workrate != null) + totalWorkrate += Math.abs(workrate); + } + return totalWorkrate; + } + + @Override public Map<ContainerType, Double> getContainerWorkrates() { + Map<ContainerType, Double> result = new LinkedHashMap<ContainerType, Double>(); + for (ContainerType node : containers) + result.put(node, getTotalWorkrate(node)); + return result; + } + + @Override public Map<ItemType, Double> getItemWorkrates(ContainerType node) { + Map<ItemType, Double> result = new LinkedHashMap<ItemType, Double>(); + for (ItemType item : getItemsForContainer(node)) + result.put(item, itemToWorkrate.get(item)); + return result; + } + + @Override public boolean isItemMoveable(ItemType item) { + // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable? + return itemToContainer.containsKey(item) && !immovableItems.contains(item); + } + + @Override public boolean isItemAllowedIn(ItemType item, Location location) { + return true; // TODO? + } + + + // Mutators. + + @Override + public void onItemMoved(ItemType item, ContainerType newNode) { + if (!itemToContainer.containsKey(item)) { + // Item may have been deleted; order of events received from different sources + // (i.e. item itself and for itemGroup membership) is non-deterministic. + LOG.info("Balanceable pool model ignoring onItemMoved for unknown item {} to container {}; " + + "if onItemAdded subsequently received will get new container then", item, newNode); + return; + } + ContainerType newNodeNonNull = toNonNullContainer(newNode); + ContainerType oldNode = itemToContainer.put(item, newNodeNonNull); + if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item); + if (newNode != null) containerToItems.put(newNode, item); + } + + @Override + public void onContainerAdded(ContainerType newContainer, double lowThreshold, double highThreshold) { + boolean added = containers.add(newContainer); + if (!added) { + // See LoadBalancingPolicy.onContainerAdded for possible explanation of why can get duplicate calls + LOG.debug("Duplicate container-added event for {}; ignoring", newContainer); + return; + } + containerToLowThreshold.put(newContainer, lowThreshold); + containerToHighThreshold.put(newContainer, highThreshold); + poolLowThreshold += lowThreshold; + poolHighThreshold += highThreshold; + } + + @Override + public void onContainerRemoved(ContainerType oldContainer) { + containers.remove(oldContainer); + Double containerLowThreshold = containerToLowThreshold.remove(oldContainer); + Double containerHighThresold = containerToHighThreshold.remove(oldContainer); + poolLowThreshold -= (containerLowThreshold != null ? containerLowThreshold : 0); + poolHighThreshold -= (containerHighThresold != null ? containerHighThresold : 0); + + // TODO: assert no orphaned items + } + + @Override + public void onItemAdded(ItemType item, ContainerType parentContainer) { + onItemAdded(item, parentContainer, false); + } + + @Override + public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable) { + // Duplicate calls to onItemAdded do no harm, as long as most recent is most accurate! + // Important that it stays that way for now - See LoadBalancingPolicy.onContainerAdded for explanation. + + if (immovable) + immovableItems.add(item); + + ContainerType parentContainerNonNull = toNonNullContainer(parentContainer); + ContainerType oldNode = itemToContainer.put(item, parentContainerNonNull); + if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item); + if (parentContainer != null) containerToItems.put(parentContainer, item); + } + + @Override + public void onItemRemoved(ItemType item) { + ContainerType oldNode = itemToContainer.remove(item); + if (oldNode != null && oldNode != NULL_CONTAINER) containerToItems.remove(oldNode, item); + Double workrate = itemToWorkrate.remove(item); + if (workrate != null) + currentPoolWorkrate -= workrate; + immovableItems.remove(item); + } + + @Override + public void onItemWorkrateUpdated(ItemType item, double newValue) { + if (hasItem(item)) { + Double oldValue = itemToWorkrate.put(item, newValue); + double delta = ( newValue - (oldValue != null ? oldValue : 0) ); + currentPoolWorkrate += delta; + } else { + // Can happen when item removed - get notification of removal and workrate from group and item + // respectively, so can overtake each other + if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of workrate for unknown item {}, to {}", item, newValue); + } + } + + private boolean hasItem(ItemType item) { + return itemToContainer.containsKey(item); + } + + + // Additional methods for tests. + + /** + * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers. + */ + @VisibleForTesting + public String itemDistributionToString() { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + dumpItemDistribution(new PrintStream(baos)); + return new String(baos.toByteArray()); + } + + @VisibleForTesting + public void dumpItemDistribution() { + dumpItemDistribution(System.out); + } + + @VisibleForTesting + public void dumpItemDistribution(PrintStream out) { + for (ContainerType container : getPoolContents()) { + out.println("Container '"+container+"': "); + for (ItemType item : getItemsForContainer(container)) { + Double workrate = getItemWorkrate(item); + out.println("\t"+"Item '"+item+"' ("+workrate+")"); + } + } + out.flush(); + } + + @SuppressWarnings("unchecked") + private ContainerType nullContainer() { + return (ContainerType) NULL_CONTAINER; // relies on erasure + } + + private ContainerType toNonNullContainer(ContainerType container) { + return (container != null) ? container : nullContainer(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java new file mode 100644 index 0000000..2538bc6 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroup.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.entity.proxying.ImplementedBy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.DynamicGroup; +import brooklyn.event.basic.BasicConfigKey; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; + +/** + * A group of items that are contained within a given (dynamically changing) set of containers. + * + * The {@link setContainers(Group)} sets the group of containers. The membership of that group + * is dynamically tracked. + * + * When containers are added/removed, or when an items is added/removed, or when an {@link Moveable} item + * is moved then the membership of this group of items is automatically updated accordingly. + * + * For example: in Monterey, this could be used to track the actors that are within a given cluster of venues. + */ +@ImplementedBy(ItemsInContainersGroupImpl.class) +public interface ItemsInContainersGroup extends DynamicGroup { + + @SetFromFlag("itemFilter") + public static final ConfigKey<Predicate<? super Entity>> ITEM_FILTER = new BasicConfigKey( + Predicate.class, "itemsInContainerGroup.itemFilter", "Filter for which items within the containers will automatically be in group", Predicates.alwaysTrue()); + + public void setContainers(Group containerGroup); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java new file mode 100644 index 0000000..225a2b6 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/ItemsInContainersGroupImpl.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.Group; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.AbstractGroup; +import brooklyn.entity.basic.DynamicGroupImpl; + +import com.google.common.base.Predicate; + +/** + * A group of items that are contained within a given (dynamically changing) set of containers. + * + * The {@link setContainers(Group)} sets the group of containers. The membership of that group + * is dynamically tracked. + * + * When containers are added/removed, or when an items is added/removed, or when an {@link Moveable} item + * is moved then the membership of this group of items is automatically updated accordingly. + * + * For example: in Monterey, this could be used to track the actors that are within a given cluster of venues. + */ +public class ItemsInContainersGroupImpl extends DynamicGroupImpl implements ItemsInContainersGroup { + + // TODO Inefficient: will not scale to many 1000s of items + + private static final Logger LOG = LoggerFactory.getLogger(ItemsInContainersGroup.class); + + private Group containerGroup; + + private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() { + @Override + public void onEvent(SensorEvent<Object> event) { + Entity source = event.getSource(); + Object value = event.getValue(); + Sensor sensor = event.getSensor(); + + if (sensor.equals(AbstractGroup.MEMBER_ADDED)) { + onContainerAdded((Entity) value); + } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) { + onContainerRemoved((Entity) value); + } else if (sensor.equals(Movable.CONTAINER)) { + onItemMoved((Movable)source, (BalanceableContainer<?>) value); + } else { + throw new IllegalStateException("Unhandled event type "+sensor+": "+event); + } + } + }; + + public ItemsInContainersGroupImpl() { + } + + @Override + public void init() { + super.init(); + setEntityFilter(new Predicate<Entity>() { + @Override public boolean apply(Entity e) { + return acceptsEntity(e); + }}); + } + + protected Predicate<? super Entity> getItemFilter() { + return getConfig(ITEM_FILTER); + } + + @Override + protected boolean acceptsEntity(Entity e) { + if (e instanceof Movable) { + return acceptsItem((Movable)e, ((Movable)e).getAttribute(Movable.CONTAINER)); + } + return false; + } + + boolean acceptsItem(Movable e, BalanceableContainer c) { + return (containerGroup != null && c != null) ? getItemFilter().apply(e) && containerGroup.hasMember(c) : false; + } + + @Override + public void setContainers(Group containerGroup) { + this.containerGroup = containerGroup; + subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler); + subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler); + subscribe(null, Movable.CONTAINER, eventHandler); + + if (LOG.isTraceEnabled()) LOG.trace("{} scanning entities on container group set", this); + rescanEntities(); + } + + private void onContainerAdded(Entity newContainer) { + if (LOG.isTraceEnabled()) LOG.trace("{} rescanning entities on container {} added", this, newContainer); + rescanEntities(); + } + + private void onContainerRemoved(Entity oldContainer) { + if (LOG.isTraceEnabled()) LOG.trace("{} rescanning entities on container {} removed", this, oldContainer); + rescanEntities(); + } + + protected void onEntityAdded(Entity item) { + if (acceptsEntity(item)) { + if (LOG.isDebugEnabled()) LOG.debug("{} adding new item {}", this, item); + addMember(item); + } + } + + protected void onEntityRemoved(Entity item) { + if (removeMember(item)) { + if (LOG.isDebugEnabled()) LOG.debug("{} removing deleted item {}", this, item); + } + } + + private void onItemMoved(Movable item, BalanceableContainer container) { + if (LOG.isTraceEnabled()) LOG.trace("{} processing moved item {}, to container {}", new Object[] {this, item, container}); + if (hasMember(item)) { + if (!acceptsItem(item, container)) { + if (LOG.isDebugEnabled()) LOG.debug("{} removing moved item {} from group, as new container {} is not a member", new Object[] {this, item, container}); + removeMember(item); + } + } else { + if (acceptsItem(item, container)) { + if (LOG.isDebugEnabled()) LOG.debug("{} adding moved item {} to group, as new container {} is a member", new Object[] {this, item, container}); + addMember(item); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java new file mode 100644 index 0000000..f437fc8 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LoadBalancingPolicy.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import static brooklyn.util.JavaGroovyEquivalents.elvis; +import static brooklyn.util.JavaGroovyEquivalents.groovyTruth; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.entity.basic.EntityLocal; +import org.apache.brooklyn.api.event.AttributeSensor; +import org.apache.brooklyn.api.event.Sensor; +import org.apache.brooklyn.api.event.SensorEvent; +import org.apache.brooklyn.api.event.SensorEventListener; +import org.apache.brooklyn.core.policy.basic.AbstractPolicy; +import org.apache.brooklyn.core.util.flags.SetFromFlag; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.basic.EntityInternal; + +import org.apache.brooklyn.policy.autoscaling.AutoScalerPolicy; + +import brooklyn.util.collections.MutableMap; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + + +/** + * <p>Policy that is attached to a pool of "containers", each of which can host one or more migratable "items". + * The policy monitors the workrates of the items and effects migrations in an attempt to ensure that the containers + * are all sufficiently utilized without any of them being overloaded. + * + * <p>The particular sensor that defines the items' workrates is specified when the policy is constructed. High- and + * low-thresholds are defined as <strong>configuration keys</strong> on each of the container entities in the pool: + * for an item sensor named {@code foo.bar.sensorName}, the corresponding container config keys would be named + * {@code foo.bar.sensorName.threshold.low} and {@code foo.bar.sensorName.threshold.high}. + * + * <p>In addition to balancing items among the available containers, this policy causes the pool Entity to emit + * {@code POOL_COLD} and {@code POOL_HOT} events when it is determined that there is a surplus or shortfall + * of container resource in the pool respectively. These events may be consumed by a separate policy that is capable + * of resizing the container pool. + */ + // removed from catalog because it cannot currently be configured via catalog mechanisms + // PolicySpec.create fails due to no no-arg constructor + // TODO make metric and model things which can be initialized from config then reinstate in catalog +//@Catalog(name="Load Balancer", description="Policy that is attached to a pool of \"containers\", each of which " +// + "can host one or more migratable \"items\". The policy monitors the workrates of the items and effects " +// + "migrations in an attempt to ensure that the containers are all sufficiently utilized without any of " +// + "them being overloaded.") +public class LoadBalancingPolicy<NodeType extends Entity, ItemType extends Movable> extends AbstractPolicy { + + private static final Logger LOG = LoggerFactory.getLogger(LoadBalancingPolicy.class); + + @SetFromFlag(defaultVal="100") + private long minPeriodBetweenExecs; + + private final AttributeSensor<? extends Number> metric; + private final String lowThresholdConfigKeyName; + private final String highThresholdConfigKeyName; + private final BalanceablePoolModel<NodeType, ItemType> model; + private final BalancingStrategy<NodeType, ItemType> strategy; + private BalanceableWorkerPool poolEntity; + + private volatile ScheduledExecutorService executor; + private final AtomicBoolean executorQueued = new AtomicBoolean(false); + private volatile long executorTime = 0; + + private int lastEmittedDesiredPoolSize = 0; + private static enum TemperatureStates { COLD, HOT } + private TemperatureStates lastEmittedPoolTemperature = null; // "cold" or "hot" + + private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() { + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void onEvent(SensorEvent<Object> event) { + if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", LoadBalancingPolicy.this, event); + Entity source = event.getSource(); + Object value = event.getValue(); + Sensor sensor = event.getSensor(); + + if (sensor.equals(metric)) { + onItemMetricUpdate((ItemType)source, ((Number) value).doubleValue(), true); + } else if (sensor.equals(BalanceableWorkerPool.CONTAINER_ADDED)) { + onContainerAdded((NodeType) value, true); + } else if (sensor.equals(BalanceableWorkerPool.CONTAINER_REMOVED)) { + onContainerRemoved((NodeType) value, true); + } else if (sensor.equals(BalanceableWorkerPool.ITEM_ADDED)) { + BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value; + onItemAdded((ItemType)pair.item, (NodeType)pair.container, true); + } else if (sensor.equals(BalanceableWorkerPool.ITEM_REMOVED)) { + BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value; + onItemRemoved((ItemType)pair.item, (NodeType)pair.container, true); + } else if (sensor.equals(BalanceableWorkerPool.ITEM_MOVED)) { + BalanceableWorkerPool.ContainerItemPair pair = (BalanceableWorkerPool.ContainerItemPair) value; + onItemMoved((ItemType)pair.item, (NodeType)pair.container, true); + } + } + }; + + public LoadBalancingPolicy() { + this(null, null); + } + + public LoadBalancingPolicy(AttributeSensor<? extends Number> metric, + BalanceablePoolModel<NodeType, ItemType> model) { + this(MutableMap.of(), metric, model); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public LoadBalancingPolicy(Map props, AttributeSensor<? extends Number> metric, + BalanceablePoolModel<NodeType, ItemType> model) { + + super(props); + this.metric = metric; + this.lowThresholdConfigKeyName = metric.getName()+".threshold.low"; + this.highThresholdConfigKeyName = metric.getName()+".threshold.high"; + this.model = model; + this.strategy = new BalancingStrategy(getDisplayName(), model); // TODO: extract interface, inject impl + + // TODO Should re-use the execution manager's thread pool, somehow + executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); + } + + @SuppressWarnings("unchecked") + @Override + public void setEntity(EntityLocal entity) { + Preconditions.checkArgument(entity instanceof BalanceableWorkerPool, "Provided entity must be a BalanceableWorkerPool"); + super.setEntity(entity); + this.poolEntity = (BalanceableWorkerPool) entity; + + // Detect when containers are added to or removed from the pool. + subscribe(poolEntity, BalanceableWorkerPool.CONTAINER_ADDED, eventHandler); + subscribe(poolEntity, BalanceableWorkerPool.CONTAINER_REMOVED, eventHandler); + subscribe(poolEntity, BalanceableWorkerPool.ITEM_ADDED, eventHandler); + subscribe(poolEntity, BalanceableWorkerPool.ITEM_REMOVED, eventHandler); + subscribe(poolEntity, BalanceableWorkerPool.ITEM_MOVED, eventHandler); + + // Take heed of any extant containers. + for (Entity container : poolEntity.getContainerGroup().getMembers()) { + onContainerAdded((NodeType)container, false); + } + for (Entity item : poolEntity.getItemGroup().getMembers()) { + onItemAdded((ItemType)item, (NodeType)item.getAttribute(Movable.CONTAINER), false); + } + + scheduleRebalance(); + } + + @Override + public void suspend() { + // TODO unsubscribe from everything? And resubscribe on resume? + super.suspend(); + if (executor != null) executor.shutdownNow();; + executorQueued.set(false); + } + + @Override + public void resume() { + super.resume(); + executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory()); + executorTime = 0; + executorQueued.set(false); + } + + private ThreadFactory newThreadFactory() { + return new ThreadFactoryBuilder() + .setNameFormat("brooklyn-followthesunpolicy-%d") + .build(); + } + + private void scheduleRebalance() { + if (isRunning() && executorQueued.compareAndSet(false, true)) { + long now = System.currentTimeMillis(); + long delay = Math.max(0, (executorTime + minPeriodBetweenExecs) - now); + + executor.schedule(new Runnable() { + @SuppressWarnings("rawtypes") + public void run() { + try { + executorTime = System.currentTimeMillis(); + executorQueued.set(false); + strategy.rebalance(); + + if (LOG.isDebugEnabled()) LOG.debug("{} post-rebalance: poolSize={}; workrate={}; lowThreshold={}; " + + "highThreshold={}", new Object[] {this, model.getPoolSize(), model.getCurrentPoolWorkrate(), + model.getPoolLowThreshold(), model.getPoolHighThreshold()}); + + if (model.isCold()) { + Map eventVal = ImmutableMap.of( + AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, model.getPoolSize(), + AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, model.getCurrentPoolWorkrate(), + AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, model.getPoolLowThreshold(), + AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, model.getPoolHighThreshold()); + + ((EntityLocal)poolEntity).emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, eventVal); + + if (LOG.isInfoEnabled()) { + int desiredPoolSize = (int) Math.ceil(model.getCurrentPoolWorkrate() / (model.getPoolLowThreshold()/model.getPoolSize())); + if (desiredPoolSize != lastEmittedDesiredPoolSize || lastEmittedPoolTemperature != TemperatureStates.COLD) { + LOG.info("{} emitted COLD (suggesting {}): {}", new Object[] {this, desiredPoolSize, eventVal}); + lastEmittedDesiredPoolSize = desiredPoolSize; + lastEmittedPoolTemperature = TemperatureStates.COLD; + } + } + + } else if (model.isHot()) { + Map eventVal = ImmutableMap.of( + AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, model.getPoolSize(), + AutoScalerPolicy.POOL_CURRENT_WORKRATE_KEY, model.getCurrentPoolWorkrate(), + AutoScalerPolicy.POOL_LOW_THRESHOLD_KEY, model.getPoolLowThreshold(), + AutoScalerPolicy.POOL_HIGH_THRESHOLD_KEY, model.getPoolHighThreshold()); + + ((EntityLocal)poolEntity).emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, eventVal); + + if (LOG.isInfoEnabled()) { + int desiredPoolSize = (int) Math.ceil(model.getCurrentPoolWorkrate() / (model.getPoolHighThreshold()/model.getPoolSize())); + if (desiredPoolSize != lastEmittedDesiredPoolSize || lastEmittedPoolTemperature != TemperatureStates.HOT) { + LOG.info("{} emitted HOT (suggesting {}): {}", new Object[] {this, desiredPoolSize, eventVal}); + lastEmittedDesiredPoolSize = desiredPoolSize; + lastEmittedPoolTemperature = TemperatureStates.HOT; + } + } + } + + } catch (Exception e) { + if (isRunning()) { + LOG.error("Error rebalancing", e); + } else { + LOG.debug("Error rebalancing, but no longer running", e); + } + } + }}, + delay, + TimeUnit.MILLISECONDS); + } + } + + // TODO Can get duplicate onContainerAdded events. + // I presume it's because we subscribe and then iterate over the extant containers. + // Solution would be for subscription to give you events for existing / current value(s). + // Also current impl messes up single-threaded updates model: the setEntity is a different thread than for subscription events. + private void onContainerAdded(NodeType newContainer, boolean rebalanceNow) { + Preconditions.checkArgument(newContainer instanceof BalanceableContainer, "Added container must be a BalanceableContainer"); + if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of container {}", this, newContainer); + // Low and high thresholds for the metric we're interested in are assumed to be present + // in the container's configuration. + Number lowThreshold = (Number) findConfigValue(newContainer, lowThresholdConfigKeyName); + Number highThreshold = (Number) findConfigValue(newContainer, highThresholdConfigKeyName); + if (lowThreshold == null || highThreshold == null) { + LOG.warn( + "Balanceable container '"+newContainer+"' does not define low- and high- threshold configuration keys: '"+ + lowThresholdConfigKeyName+"' and '"+highThresholdConfigKeyName+"', skipping"); + return; + } + + model.onContainerAdded(newContainer, lowThreshold.doubleValue(), highThreshold.doubleValue()); + + // Note: no need to scan the container for items; they will appear via the ITEM_ADDED events. + // Also, must abide by any item-filters etc defined in the pool. + + if (rebalanceNow) scheduleRebalance(); + } + + private static Object findConfigValue(Entity entity, String configKeyName) { + Map<ConfigKey<?>, Object> config = ((EntityInternal)entity).getAllConfig(); + for (Entry<ConfigKey<?>, Object> entry : config.entrySet()) { + if (configKeyName.equals(entry.getKey().getName())) + return entry.getValue(); + } + return null; + } + + // TODO Receiving duplicates of onContainerRemoved (e.g. when running LoadBalancingInmemorySoakTest) + private void onContainerRemoved(NodeType oldContainer, boolean rebalanceNow) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of container {}", this, oldContainer); + model.onContainerRemoved(oldContainer); + if (rebalanceNow) scheduleRebalance(); + } + + private void onItemAdded(ItemType item, NodeType parentContainer, boolean rebalanceNow) { + Preconditions.checkArgument(item instanceof Movable, "Added item "+item+" must implement Movable"); + if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of item {} in container {}", new Object[] {this, item, parentContainer}); + + subscribe(item, metric, eventHandler); + + // Update the model, including the current metric value (if any). + boolean immovable = (Boolean)elvis(item.getConfig(Movable.IMMOVABLE), false); + Number currentValue = item.getAttribute(metric); + model.onItemAdded(item, parentContainer, immovable); + if (currentValue != null) + model.onItemWorkrateUpdated(item, currentValue.doubleValue()); + + if (rebalanceNow) scheduleRebalance(); + } + + private void onItemRemoved(ItemType item, NodeType parentContainer, boolean rebalanceNow) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of item {}", this, item); + unsubscribe(item); + model.onItemRemoved(item); + if (rebalanceNow) scheduleRebalance(); + } + + private void onItemMoved(ItemType item, NodeType parentContainer, boolean rebalanceNow) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording moving of item {} to {}", new Object[] {this, item, parentContainer}); + model.onItemMoved(item, parentContainer); + if (rebalanceNow) scheduleRebalance(); + } + + private void onItemMetricUpdate(ItemType item, double newValue, boolean rebalanceNow) { + if (LOG.isTraceEnabled()) LOG.trace("{} recording metric update for item {}, new value {}", new Object[] {this, item, newValue}); + model.onItemWorkrateUpdated(item, newValue); + if (rebalanceNow) scheduleRebalance(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : ""); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java new file mode 100644 index 0000000..b58b8d2 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/LocationConstraint.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import org.apache.brooklyn.api.location.Location; + +/** + * Temporary stub to resolve dependencies in ported LoadBalancingPolicy. + */ +public class LocationConstraint { + public boolean isPermitted(Location l) { return true; } // TODO +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java new file mode 100644 index 0000000..68ba43d --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/Movable.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.core.util.flags.SetFromFlag; + +import brooklyn.config.ConfigKey; +import brooklyn.entity.annotation.Effector; +import brooklyn.entity.annotation.EffectorParam; +import brooklyn.entity.basic.MethodEffector; +import brooklyn.event.basic.BasicAttributeSensor; +import brooklyn.event.basic.BasicConfigKey; + + +/** + * Represents an item that can be migrated between balanceable containers. + */ +public interface Movable extends Entity { + + @SetFromFlag("immovable") + public static ConfigKey<Boolean> IMMOVABLE = new BasicConfigKey<Boolean>( + Boolean.class, "movable.item.immovable", "Indicates whether this item instance is immovable, so cannot be moved by policies", false); + + public static BasicAttributeSensor<BalanceableContainer> CONTAINER = new BasicAttributeSensor<BalanceableContainer>( + BalanceableContainer.class, "movable.item.container", "The container that this item is on"); + + public static final MethodEffector<Void> MOVE = new MethodEffector<Void>(Movable.class, "move"); + + public String getContainerId(); + + @Effector(description="Moves this entity to the given container") + public void move(@EffectorParam(name="destination") Entity destination); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java new file mode 100644 index 0000000..b5c6a28 --- /dev/null +++ b/policy/src/main/java/org/apache/brooklyn/policy/loadbalancing/PolicyUtilForPool.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.loadbalancing; + +import java.util.Set; + +/** + * Provides conveniences for searching for hot/cold containers in a provided pool model. + * Ported from Monterey v3, with irrelevant bits removed. + */ +public class PolicyUtilForPool<ContainerType, ItemType> { + + private final BalanceablePoolModel<ContainerType, ItemType> model; + + + public PolicyUtilForPool (BalanceablePoolModel<ContainerType, ItemType> model) { + this.model = model; + } + + public ContainerType findColdestContainer(Set<ContainerType> excludedContainers) { + return findColdestContainer(excludedContainers, null); + } + + /** + * Identifies the container with the maximum spare capacity (highThreshold - currentWorkrate), + * returns null if none of the model's nodes has spare capacity. + */ + public ContainerType findColdestContainer(Set<ContainerType> excludedContainers, LocationConstraint locationConstraint) { + double maxSpareCapacity = 0; + ContainerType coldest = null; + + for (ContainerType c : model.getPoolContents()) { + if (excludedContainers.contains(c)) + continue; + if (locationConstraint != null && !locationConstraint.isPermitted(model.getLocation(c))) + continue; + + double highThreshold = model.getHighThreshold(c); + double totalWorkrate = model.getTotalWorkrate(c); + double spareCapacity = highThreshold - totalWorkrate; + + if (highThreshold == -1 || totalWorkrate == -1) { + continue; // container presumably has been removed + } + if (spareCapacity > maxSpareCapacity) { + maxSpareCapacity = spareCapacity; + coldest = c; + } + } + return coldest; + } + + /** + * Identifies the container with the maximum overshoot (currentWorkrate - highThreshold), + * returns null if none of the model's nodes has an overshoot. + */ + public ContainerType findHottestContainer(Set<ContainerType> excludedContainers) { + double maxOvershoot = 0; + ContainerType hottest = null; + + for (ContainerType c : model.getPoolContents()) { + if (excludedContainers.contains(c)) + continue; + + double totalWorkrate = model.getTotalWorkrate(c); + double highThreshold = model.getHighThreshold(c); + double overshoot = totalWorkrate - highThreshold; + + if (highThreshold == -1 || totalWorkrate == -1) { + continue; // container presumably has been removed + } + if (overshoot > maxOvershoot) { + maxOvershoot = overshoot; + hottest = c; + } + } + return hottest; + } + +} \ No newline at end of file
