Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Aug 19 23:49:39 2014 @@ -18,17 +18,9 @@ package org.apache.hadoop.hdfs.server.balancer; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.io.PrintStream; -import java.net.Socket; import java.net.URI; import java.text.DateFormat; import java.util.ArrayList; @@ -37,51 +29,38 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.Formatter; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.net.Node; -import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import com.google.common.base.Preconditions; + /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster * when some datanodes become full or when new empty nodes join the cluster. * The tool is deployed as an application program that can be run by the @@ -182,667 +161,41 @@ import org.apache.hadoop.util.ToolRunner @InterfaceAudience.Private public class Balancer { static final Log LOG = LogFactory.getLog(Balancer.class); - final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*1024*1024*1024L; //2GB - private static long WIN_WIDTH = 5400*1000L; // 1.5 hour - /** The maximum number of concurrent blocks moves for - * balancing purpose at a datanode - */ - public static final int MAX_NUM_CONCURRENT_MOVES = 5; - private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5; - public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds - public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes - + private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); + + private static final long GB = 1L << 30; //1GB + private static final long MAX_SIZE_TO_MOVE = 10*GB; + private static final String USAGE = "Usage: java " + Balancer.class.getSimpleName() + "\n\t[-policy <policy>]\tthe balancing policy: " + BalancingPolicy.Node.INSTANCE.getName() + " or " + BalancingPolicy.Pool.INSTANCE.getName() - + "\n\t[-threshold <threshold>]\tPercentage of disk capacity"; + + "\n\t[-threshold <threshold>]\tPercentage of disk capacity" + + "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]" + + "\tExcludes the specified datanodes." + + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]" + + "\tIncludes only the specified datanodes."; - private final NameNodeConnector nnc; + private final Dispatcher dispatcher; private final BalancingPolicy policy; private final double threshold; // all data node lists - private final Collection<Source> overUtilizedDatanodes - = new LinkedList<Source>(); - private final Collection<Source> aboveAvgUtilizedDatanodes - = new LinkedList<Source>(); - private final Collection<BalancerDatanode> belowAvgUtilizedDatanodes - = new LinkedList<BalancerDatanode>(); - private final Collection<BalancerDatanode> underUtilizedDatanodes - = new LinkedList<BalancerDatanode>(); - - private final Collection<Source> sources - = new HashSet<Source>(); - private final Collection<BalancerDatanode> targets - = new HashSet<BalancerDatanode>(); - - private final Map<Block, BalancerBlock> globalBlockList - = new HashMap<Block, BalancerBlock>(); - private final MovedBlocks movedBlocks = new MovedBlocks(); - /** Map (datanodeUuid -> BalancerDatanodes) */ - private final Map<String, BalancerDatanode> datanodeMap - = new HashMap<String, BalancerDatanode>(); - - private NetworkTopology cluster; - - private final ExecutorService moverExecutor; - private final ExecutorService dispatcherExecutor; - - /* This class keeps track of a scheduled block move */ - private class PendingBlockMove { - private BalancerBlock block; - private Source source; - private BalancerDatanode proxySource; - private BalancerDatanode target; - - /** constructor */ - private PendingBlockMove() { - } - - @Override - public String toString() { - final Block b = block.getBlock(); - return b + " with size=" + b.getNumBytes() + " from " - + source.getDisplayName() + " to " + target.getDisplayName() - + " through " + proxySource.getDisplayName(); - } - - /* choose a block & a proxy source for this pendingMove - * whose source & target have already been chosen. - * - * Return true if a block and its proxy are chosen; false otherwise - */ - private boolean chooseBlockAndProxy() { - // iterate all source's blocks until find a good one - for (Iterator<BalancerBlock> blocks= - source.getBlockIterator(); blocks.hasNext();) { - if (markMovedIfGoodBlock(blocks.next())) { - blocks.remove(); - return true; - } - } - return false; - } - - /* Return true if the given block is good for the tentative move; - * If it is good, add it to the moved list to marked as "Moved". - * A block is good if - * 1. it is a good candidate; see isGoodBlockCandidate - * 2. can find a proxy source that's not busy for this move - */ - private boolean markMovedIfGoodBlock(BalancerBlock block) { - synchronized(block) { - synchronized(movedBlocks) { - if (isGoodBlockCandidate(source, target, block)) { - this.block = block; - if ( chooseProxySource() ) { - movedBlocks.add(block); - if (LOG.isDebugEnabled()) { - LOG.debug("Decided to move " + this); - } - return true; - } - } - } - } - return false; - } - - /* Now we find out source, target, and block, we need to find a proxy - * - * @return true if a proxy is found; otherwise false - */ - private boolean chooseProxySource() { - final DatanodeInfo targetDN = target.getDatanode(); - // if node group is supported, first try add nodes in the same node group - if (cluster.isNodeGroupAware()) { - for (BalancerDatanode loc : block.getLocations()) { - if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) { - return true; - } - } - } - // check if there is replica which is on the same rack with the target - for (BalancerDatanode loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { - return true; - } - } - // find out a non-busy replica - for (BalancerDatanode loc : block.getLocations()) { - if (addTo(loc)) { - return true; - } - } - return false; - } - - // add a BalancerDatanode as proxy source for specific block movement - private boolean addTo(BalancerDatanode bdn) { - if (bdn.addPendingBlock(this)) { - proxySource = bdn; - return true; - } - return false; - } - - /* Dispatch the block move task to the proxy source & wait for the response - */ - private void dispatch() { - Socket sock = new Socket(); - DataOutputStream out = null; - DataInputStream in = null; - try { - sock.connect( - NetUtils.createSocketAddr(target.datanode.getXferAddr()), - HdfsServerConstants.READ_TIMEOUT); - /* Unfortunately we don't have a good way to know if the Datanode is - * taking a really long time to move a block, OR something has - * gone wrong and it's never going to finish. To deal with this - * scenario, we set a long timeout (20 minutes) to avoid hanging - * the balancer indefinitely. - */ - sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); - - sock.setKeepAlive(true); - - OutputStream unbufOut = sock.getOutputStream(); - InputStream unbufIn = sock.getInputStream(); - if (nnc.getDataEncryptionKey() != null) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, nnc.getDataEncryptionKey()); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsConstants.IO_FILE_BUFFER_SIZE)); - in = new DataInputStream(new BufferedInputStream(unbufIn, - HdfsConstants.IO_FILE_BUFFER_SIZE)); - - sendRequest(out); - receiveResponse(in); - bytesMoved.inc(block.getNumBytes()); - LOG.info("Successfully moved " + this); - } catch (IOException e) { - LOG.warn("Failed to move " + this + ": " + e.getMessage()); - /* proxy or target may have an issue, insert a small delay - * before using these nodes further. This avoids a potential storm - * of "threads quota exceeded" Warnings when the balancer - * gets out of sync with work going on in datanode. - */ - proxySource.activateDelay(DELAY_AFTER_ERROR); - target.activateDelay(DELAY_AFTER_ERROR); - } finally { - IOUtils.closeStream(out); - IOUtils.closeStream(in); - IOUtils.closeSocket(sock); - - proxySource.removePendingBlock(this); - target.removePendingBlock(this); - - synchronized (this ) { - reset(); - } - synchronized (Balancer.this) { - Balancer.this.notifyAll(); - } - } - } - - /* Send a block replace request to the output stream*/ - private void sendRequest(DataOutputStream out) throws IOException { - final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); - final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb); - new Sender(out).replaceBlock(eb, accessToken, - source.getStorageID(), proxySource.getDatanode()); - } - - /* Receive a block copy response from the input stream */ - private void receiveResponse(DataInputStream in) throws IOException { - BlockOpResponseProto response = BlockOpResponseProto.parseFrom( - vintPrefixed(in)); - if (response.getStatus() != Status.SUCCESS) { - if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) - throw new IOException("block move failed due to access token error"); - throw new IOException("block move is failed: " + - response.getMessage()); - } - } - - /* reset the object */ - private void reset() { - block = null; - source = null; - proxySource = null; - target = null; - } - - /* start a thread to dispatch the block move */ - private void scheduleBlockMove() { - moverExecutor.execute(new Runnable() { - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("Start moving " + PendingBlockMove.this); - } - dispatch(); - } - }); - } - } - - /* A class for keeping track of blocks in the Balancer */ - static private class BalancerBlock { - private final Block block; // the block - private final List<BalancerDatanode> locations - = new ArrayList<BalancerDatanode>(3); // its locations - - /* Constructor */ - private BalancerBlock(Block block) { - this.block = block; - } - - /* clean block locations */ - private synchronized void clearLocations() { - locations.clear(); - } - - /* add a location */ - private synchronized void addLocation(BalancerDatanode datanode) { - if (!locations.contains(datanode)) { - locations.add(datanode); - } - } - - /* Return if the block is located on <code>datanode</code> */ - private synchronized boolean isLocatedOnDatanode( - BalancerDatanode datanode) { - return locations.contains(datanode); - } - - /* Return its locations */ - private synchronized List<BalancerDatanode> getLocations() { - return locations; - } - - /* Return the block */ - private Block getBlock() { - return block; - } - - /* Return the length of the block */ - private long getNumBytes() { - return block.getNumBytes(); - } - } - - /* The class represents a desired move of bytes between two nodes - * and the target. - * An object of this class is stored in a source node. - */ - static private class NodeTask { - private final BalancerDatanode datanode; //target node - private long size; //bytes scheduled to move - - /* constructor */ - private NodeTask(BalancerDatanode datanode, long size) { - this.datanode = datanode; - this.size = size; - } - - /* Get the node */ - private BalancerDatanode getDatanode() { - return datanode; - } - - /* Get the number of bytes that need to be moved */ - private long getSize() { - return size; - } - } - - - /* A class that keeps track of a datanode in Balancer */ - private static class BalancerDatanode { - final private static long MAX_SIZE_TO_MOVE = 10*1024*1024*1024L; //10GB - final DatanodeInfo datanode; - final double utilization; - final long maxSize2Move; - private long scheduledSize = 0L; - protected long delayUntil = 0L; - // blocks being moved but not confirmed yet - private final List<PendingBlockMove> pendingBlocks = - new ArrayList<PendingBlockMove>(MAX_NUM_CONCURRENT_MOVES); - - @Override - public String toString() { - return getClass().getSimpleName() + "[" + datanode - + ", utilization=" + utilization + "]"; - } - - /* Constructor - * Depending on avgutil & threshold, calculate maximum bytes to move - */ - private BalancerDatanode(DatanodeInfo node, BalancingPolicy policy, double threshold) { - datanode = node; - utilization = policy.getUtilization(node); - final double avgUtil = policy.getAvgUtilization(); - long maxSizeToMove; - - if (utilization >= avgUtil+threshold - || utilization <= avgUtil-threshold) { - maxSizeToMove = (long)(threshold*datanode.getCapacity()/100); - } else { - maxSizeToMove = - (long)(Math.abs(avgUtil-utilization)*datanode.getCapacity()/100); - } - if (utilization < avgUtil ) { - maxSizeToMove = Math.min(datanode.getRemaining(), maxSizeToMove); - } - this.maxSize2Move = Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); - } - - /** Get the datanode */ - protected DatanodeInfo getDatanode() { - return datanode; - } - - /** Get the name of the datanode */ - protected String getDisplayName() { - return datanode.toString(); - } - - /* Get the storage id of the datanode */ - protected String getStorageID() { - return datanode.getDatanodeUuid(); - } - - /** Decide if still need to move more bytes */ - protected synchronized boolean hasSpaceForScheduling() { - return scheduledSize<maxSize2Move; - } - - /** Return the total number of bytes that need to be moved */ - protected synchronized long availableSizeToMove() { - return maxSize2Move-scheduledSize; - } - - /** increment scheduled size */ - protected synchronized void incScheduledSize(long size) { - scheduledSize += size; - } - - /** decrement scheduled size */ - protected synchronized void decScheduledSize(long size) { - scheduledSize -= size; - } - - /** get scheduled size */ - protected synchronized long getScheduledSize(){ - return scheduledSize; - } - - /** get scheduled size */ - protected synchronized void setScheduledSize(long size){ - scheduledSize = size; - } - - synchronized private void activateDelay(long delta) { - delayUntil = Time.now() + delta; - } - - synchronized private boolean isDelayActive() { - if (delayUntil == 0 || Time.now() > delayUntil){ - delayUntil = 0; - return false; - } - return true; - } - - /* Check if the node can schedule more blocks to move */ - synchronized private boolean isPendingQNotFull() { - if ( pendingBlocks.size() < MAX_NUM_CONCURRENT_MOVES ) { - return true; - } - return false; - } - - /* Check if all the dispatched moves are done */ - synchronized private boolean isPendingQEmpty() { - return pendingBlocks.isEmpty(); - } - - /* Add a scheduled block move to the node */ - private synchronized boolean addPendingBlock( - PendingBlockMove pendingBlock) { - if (!isDelayActive() && isPendingQNotFull()) { - return pendingBlocks.add(pendingBlock); - } - return false; - } - - /* Remove a scheduled block move from the node */ - private synchronized boolean removePendingBlock( - PendingBlockMove pendingBlock) { - return pendingBlocks.remove(pendingBlock); - } - } - - /** A node that can be the sources of a block move */ - private class Source extends BalancerDatanode { - - /* A thread that initiates a block move - * and waits for block move to complete */ - private class BlockMoveDispatcher implements Runnable { - @Override - public void run() { - dispatchBlocks(); - } - } - - private final ArrayList<NodeTask> nodeTasks = new ArrayList<NodeTask>(2); - private long blocksToReceive = 0L; - /* source blocks point to balancerBlocks in the global list because - * we want to keep one copy of a block in balancer and be aware that - * the locations are changing over time. - */ - private final List<BalancerBlock> srcBlockList - = new ArrayList<BalancerBlock>(); - - /* constructor */ - private Source(DatanodeInfo node, BalancingPolicy policy, double threshold) { - super(node, policy, threshold); - } - - /** Add a node task */ - private void addNodeTask(NodeTask task) { - assert (task.datanode != this) : - "Source and target are the same " + datanode; - incScheduledSize(task.getSize()); - nodeTasks.add(task); - } - - /* Return an iterator to this source's blocks */ - private Iterator<BalancerBlock> getBlockIterator() { - return srcBlockList.iterator(); - } - - /* fetch new blocks of this source from namenode and - * update this source's block list & the global block list - * Return the total size of the received blocks in the number of bytes. - */ - private long getBlockList() throws IOException { - BlockWithLocations[] newBlocks = nnc.namenode.getBlocks(datanode, - Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive)).getBlocks(); - long bytesReceived = 0; - for (BlockWithLocations blk : newBlocks) { - bytesReceived += blk.getBlock().getNumBytes(); - BalancerBlock block; - synchronized(globalBlockList) { - block = globalBlockList.get(blk.getBlock()); - if (block==null) { - block = new BalancerBlock(blk.getBlock()); - globalBlockList.put(blk.getBlock(), block); - } else { - block.clearLocations(); - } - - synchronized (block) { - // update locations - for (String datanodeUuid : blk.getDatanodeUuids()) { - final BalancerDatanode d = datanodeMap.get(datanodeUuid); - if (datanode != null) { // not an unknown datanode - block.addLocation(d); - } - } - } - if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) { - // filter bad candidates - srcBlockList.add(block); - } - } - } - return bytesReceived; - } - - /* Decide if the given block is a good candidate to move or not */ - private boolean isGoodBlockCandidate(BalancerBlock block) { - for (NodeTask nodeTask : nodeTasks) { - if (Balancer.this.isGoodBlockCandidate(this, nodeTask.datanode, block)) { - return true; - } - } - return false; - } - - /* Return a block that's good for the source thread to dispatch immediately - * The block's source, target, and proxy source are determined too. - * When choosing proxy and target, source & target throttling - * has been considered. They are chosen only when they have the capacity - * to support this block move. - * The block should be dispatched immediately after this method is returned. - */ - private PendingBlockMove chooseNextBlockToMove() { - for ( Iterator<NodeTask> tasks=nodeTasks.iterator(); tasks.hasNext(); ) { - NodeTask task = tasks.next(); - BalancerDatanode target = task.getDatanode(); - PendingBlockMove pendingBlock = new PendingBlockMove(); - if (target.addPendingBlock(pendingBlock)) { - // target is not busy, so do a tentative block allocation - pendingBlock.source = this; - pendingBlock.target = target; - if ( pendingBlock.chooseBlockAndProxy() ) { - long blockSize = pendingBlock.block.getNumBytes(); - decScheduledSize(blockSize); - task.size -= blockSize; - if (task.size == 0) { - tasks.remove(); - } - return pendingBlock; - } else { - // cancel the tentative move - target.removePendingBlock(pendingBlock); - } - } - } - return null; - } - - /* iterate all source's blocks to remove moved ones */ - private void filterMovedBlocks() { - for (Iterator<BalancerBlock> blocks=getBlockIterator(); - blocks.hasNext();) { - if (movedBlocks.contains(blocks.next())) { - blocks.remove(); - } - } - } - - private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5; - /* Return if should fetch more blocks from namenode */ - private boolean shouldFetchMoreBlocks() { - return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE && - blocksToReceive>0; - } - - /* This method iteratively does the following: - * it first selects a block to move, - * then sends a request to the proxy source to start the block move - * when the source's block list falls below a threshold, it asks - * the namenode for more blocks. - * It terminates when it has dispatch enough block move tasks or - * it has received enough blocks from the namenode, or - * the elapsed time of the iteration has exceeded the max time limit. - */ - private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins - private void dispatchBlocks() { - long startTime = Time.now(); - long scheduledSize = getScheduledSize(); - this.blocksToReceive = 2*scheduledSize; - boolean isTimeUp = false; - int noPendingBlockIteration = 0; - while(!isTimeUp && getScheduledSize()>0 && - (!srcBlockList.isEmpty() || blocksToReceive>0)) { - PendingBlockMove pendingBlock = chooseNextBlockToMove(); - if (pendingBlock != null) { - // move the block - pendingBlock.scheduleBlockMove(); - continue; - } - - /* Since we can not schedule any block to move, - * filter any moved blocks from the source block list and - * check if we should fetch more blocks from the namenode - */ - filterMovedBlocks(); // filter already moved blocks - if (shouldFetchMoreBlocks()) { - // fetch new blocks - try { - blocksToReceive -= getBlockList(); - continue; - } catch (IOException e) { - LOG.warn("Exception while getting block list", e); - return; - } - } else { - // source node cannot find a pendingBlockToMove, iteration +1 - noPendingBlockIteration++; - // in case no blocks can be moved for source node's task, - // jump out of while-loop after 5 iterations. - if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) { - setScheduledSize(0); - } - } - - // check if time is up or not - if (Time.now()-startTime > MAX_ITERATION_TIME) { - isTimeUp = true; - continue; - } - - /* Now we can not schedule any block to move and there are - * no new blocks added to the source block list, so we wait. - */ - try { - synchronized(Balancer.this) { - Balancer.this.wait(1000); // wait for targets/sources to be idle - } - } catch (InterruptedException ignored) { - } - } - } - } + private final Collection<Source> overUtilized = new LinkedList<Source>(); + private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>(); + private final Collection<StorageGroup> belowAvgUtilized + = new LinkedList<StorageGroup>(); + private final Collection<StorageGroup> underUtilized + = new LinkedList<StorageGroup>(); /* Check that this Balancer is compatible with the Block Placement Policy * used by the Namenode. */ private static void checkReplicationPolicyCompatibility(Configuration conf ) throws UnsupportedActionException { - if (!(BlockPlacementPolicy.getInstance(conf, null, null) instanceof + if (!(BlockPlacementPolicy.getInstance(conf, null, null, null) instanceof BlockPlacementPolicyDefault)) { throw new UnsupportedActionException( "Balancer without BlockPlacementPolicyDefault"); @@ -857,190 +210,185 @@ public class Balancer { * when connection fails. */ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { + final long movedWinWidth = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); + final int moverThreads = conf.getInt( + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT); + final int dispatcherThreads = conf.getInt( + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT); + final int maxConcurrentMovesPerNode = conf.getInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + + this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, + p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, + maxConcurrentMovesPerNode, conf); this.threshold = p.threshold; this.policy = p.policy; - this.nnc = theblockpool; - cluster = NetworkTopology.getInstance(conf); - - this.moverExecutor = Executors.newFixedThreadPool( - conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT)); - this.dispatcherExecutor = Executors.newFixedThreadPool( - conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT)); } - /* Given a data node set, build a network topology and decide - * over-utilized datanodes, above average utilized datanodes, - * below average utilized datanodes, and underutilized datanodes. - * The input data node set is shuffled before the datanodes - * are put into the over-utilized datanodes, above average utilized - * datanodes, below average utilized datanodes, and - * underutilized datanodes lists. This will add some randomness - * to the node matching later on. - * - * @return the total number of bytes that are - * needed to move to make the cluster balanced. - * @param datanodes a set of datanodes + private static long getCapacity(DatanodeStorageReport report, StorageType t) { + long capacity = 0L; + for(StorageReport r : report.getStorageReports()) { + if (r.getStorage().getStorageType() == t) { + capacity += r.getCapacity(); + } + } + return capacity; + } + + private static long getRemaining(DatanodeStorageReport report, StorageType t) { + long remaining = 0L; + for(StorageReport r : report.getStorageReports()) { + if (r.getStorage().getStorageType() == t) { + remaining += r.getRemaining(); + } + } + return remaining; + } + + /** + * Given a datanode storage set, build a network topology and decide + * over-utilized storages, above average utilized storages, + * below average utilized storages, and underutilized storages. + * The input datanode storage set is shuffled in order to randomize + * to the storage matching later on. + * + * @return the number of bytes needed to move in order to balance the cluster. */ - private long initNodes(DatanodeInfo[] datanodes) { + private long init(List<DatanodeStorageReport> reports) { // compute average utilization - for (DatanodeInfo datanode : datanodes) { - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { - continue; // ignore decommissioning or decommissioned nodes - } - policy.accumulateSpaces(datanode); + for (DatanodeStorageReport r : reports) { + policy.accumulateSpaces(r); } policy.initAvgUtilization(); - /*create network topology and all data node lists: - * overloaded, above-average, below-average, and underloaded - * we alternates the accessing of the given datanodes array either by - * an increasing order or a decreasing order. - */ + // create network topology and classify utilization collections: + // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; - for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) { - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { - continue; // ignore decommissioning or decommissioned nodes - } - cluster.add(datanode); - BalancerDatanode datanodeS; - final double avg = policy.getAvgUtilization(); - if (policy.getUtilization(datanode) >= avg) { - datanodeS = new Source(datanode, policy, threshold); - if (isAboveAvgUtilized(datanodeS)) { - this.aboveAvgUtilizedDatanodes.add((Source)datanodeS); - } else { - assert(isOverUtilized(datanodeS)) : - datanodeS.getDisplayName()+ "is not an overUtilized node"; - this.overUtilizedDatanodes.add((Source)datanodeS); - overLoadedBytes += (long)((datanodeS.utilization-avg - -threshold)*datanodeS.datanode.getCapacity()/100.0); + for(DatanodeStorageReport r : reports) { + final DDatanode dn = dispatcher.newDatanode(r); + for(StorageType t : StorageType.asList()) { + final Double utilization = policy.getUtilization(r, t); + if (utilization == null) { // datanode does not have such storage type + continue; } - } else { - datanodeS = new BalancerDatanode(datanode, policy, threshold); - if ( isBelowOrEqualAvgUtilized(datanodeS)) { - this.belowAvgUtilizedDatanodes.add(datanodeS); + + final long capacity = getCapacity(r, t); + final double utilizationDiff = utilization - policy.getAvgUtilization(t); + final double thresholdDiff = Math.abs(utilizationDiff) - threshold; + final long maxSize2Move = computeMaxSize2Move(capacity, + getRemaining(r, t), utilizationDiff, threshold); + + final StorageGroup g; + if (utilizationDiff > 0) { + final Source s = dn.addSource(t, maxSize2Move, dispatcher); + if (thresholdDiff <= 0) { // within threshold + aboveAvgUtilized.add(s); + } else { + overLoadedBytes += precentage2bytes(thresholdDiff, capacity); + overUtilized.add(s); + } + g = s; } else { - assert isUnderUtilized(datanodeS) : "isUnderUtilized(" - + datanodeS.getDisplayName() + ")=" + isUnderUtilized(datanodeS) - + ", utilization=" + datanodeS.utilization; - this.underUtilizedDatanodes.add(datanodeS); - underLoadedBytes += (long)((avg-threshold- - datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0); + g = dn.addStorageGroup(t, maxSize2Move); + if (thresholdDiff <= 0) { // within threshold + belowAvgUtilized.add(g); + } else { + underLoadedBytes += precentage2bytes(thresholdDiff, capacity); + underUtilized.add(g); + } } + dispatcher.getStorageGroupMap().put(g); } - datanodeMap.put(datanode.getDatanodeUuid(), datanodeS); } - //logging - logNodes(); + logUtilizationCollections(); - assert (this.datanodeMap.size() == - overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ - aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()) - : "Mismatched number of datanodes"; + Preconditions.checkState(dispatcher.getStorageGroupMap().size() + == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size() + + belowAvgUtilized.size(), + "Mismatched number of storage groups"); // return number of bytes to be moved in order to make the cluster balanced return Math.max(overLoadedBytes, underLoadedBytes); } - /* log the over utilized & under utilized nodes */ - private void logNodes() { - logNodes("over-utilized", overUtilizedDatanodes); - if (LOG.isTraceEnabled()) { - logNodes("above-average", aboveAvgUtilizedDatanodes); - logNodes("below-average", belowAvgUtilizedDatanodes); - } - logNodes("underutilized", underUtilizedDatanodes); - } - - private static <T extends BalancerDatanode> void logNodes( - String name, Collection<T> nodes) { - LOG.info(nodes.size() + " " + name + ": " + nodes); + private static long computeMaxSize2Move(final long capacity, final long remaining, + final double utilizationDiff, final double threshold) { + final double diff = Math.min(threshold, Math.abs(utilizationDiff)); + long maxSizeToMove = precentage2bytes(diff, capacity); + if (utilizationDiff < 0) { + maxSizeToMove = Math.min(remaining, maxSizeToMove); + } + return Math.min(MAX_SIZE_TO_MOVE, maxSizeToMove); } - /** A matcher interface for matching nodes. */ - private interface Matcher { - /** Given the cluster topology, does the left node match the right node? */ - boolean match(NetworkTopology cluster, Node left, Node right); + private static long precentage2bytes(double precentage, long capacity) { + Preconditions.checkArgument(precentage >= 0, + "precentage = " + precentage + " < 0"); + return (long)(precentage * capacity / 100.0); } - /** Match datanodes in the same node group. */ - static final Matcher SAME_NODE_GROUP = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return cluster.isOnSameNodeGroup(left, right); - } - }; - - /** Match datanodes in the same rack. */ - static final Matcher SAME_RACK = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return cluster.isOnSameRack(left, right); + /* log the over utilized & under utilized nodes */ + private void logUtilizationCollections() { + logUtilizationCollection("over-utilized", overUtilized); + if (LOG.isTraceEnabled()) { + logUtilizationCollection("above-average", aboveAvgUtilized); + logUtilizationCollection("below-average", belowAvgUtilized); } - }; + logUtilizationCollection("underutilized", underUtilized); + } - /** Match any datanode with any other datanode. */ - static final Matcher ANY_OTHER = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return left != right; - } - }; + private static <T extends StorageGroup> + void logUtilizationCollection(String name, Collection<T> items) { + LOG.info(items.size() + " " + name + ": " + items); + } /** * Decide all <source, target> pairs and * the number of bytes to move from a source to a target - * Maximum bytes to be moved per node is - * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). - * Return total number of bytes to move in this iteration + * Maximum bytes to be moved per storage group is + * min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). + * @return total number of bytes to move in this iteration */ - private long chooseNodes() { + private long chooseStorageGroups() { // First, match nodes on the same node group if cluster is node group aware - if (cluster.isNodeGroupAware()) { - chooseNodes(SAME_NODE_GROUP); + if (dispatcher.getCluster().isNodeGroupAware()) { + chooseStorageGroups(Matcher.SAME_NODE_GROUP); } // Then, match nodes on the same rack - chooseNodes(SAME_RACK); + chooseStorageGroups(Matcher.SAME_RACK); // At last, match all remaining nodes - chooseNodes(ANY_OTHER); + chooseStorageGroups(Matcher.ANY_OTHER); - assert (datanodeMap.size() >= sources.size()+targets.size()) - : "Mismatched number of datanodes (" + - datanodeMap.size() + " total, " + - sources.size() + " sources, " + - targets.size() + " targets)"; - - long bytesToMove = 0L; - for (Source src : sources) { - bytesToMove += src.getScheduledSize(); - } - return bytesToMove; + return dispatcher.bytesToMove(); } /** Decide all <source, target> pairs according to the matcher. */ - private void chooseNodes(final Matcher matcher) { + private void chooseStorageGroups(final Matcher matcher) { /* first step: match each overUtilized datanode (source) to * one or more underUtilized datanodes (targets). */ - chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher); + chooseStorageGroups(overUtilized, underUtilized, matcher); /* match each remaining overutilized datanode (source) to * below average utilized datanodes (targets). * Note only overutilized datanodes that haven't had that max bytes to move * satisfied in step 1 are selected */ - chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher); + chooseStorageGroups(overUtilized, belowAvgUtilized, matcher); /* match each remaining underutilized datanode (target) to * above average utilized datanodes (source). * Note only underutilized datanodes that have not had that max bytes to * move satisfied in step 1 are selected. */ - chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher); + chooseStorageGroups(underUtilized, aboveAvgUtilized, matcher); } /** @@ -1048,13 +396,13 @@ public class Balancer { * datanodes or the candidates are source nodes with (utilization > Avg), and * the others are target nodes with (utilization < Avg). */ - private <D extends BalancerDatanode, C extends BalancerDatanode> void - chooseDatanodes(Collection<D> datanodes, Collection<C> candidates, + private <G extends StorageGroup, C extends StorageGroup> + void chooseStorageGroups(Collection<G> groups, Collection<C> candidates, Matcher matcher) { - for (Iterator<D> i = datanodes.iterator(); i.hasNext();) { - final D datanode = i.next(); - for(; chooseForOneDatanode(datanode, candidates, matcher); ); - if (!datanode.hasSpaceForScheduling()) { + for(final Iterator<G> i = groups.iterator(); i.hasNext();) { + final G g = i.next(); + for(; choose4One(g, candidates, matcher); ); + if (!g.hasSpaceForScheduling()) { i.remove(); } } @@ -1064,18 +412,18 @@ public class Balancer { * For the given datanode, choose a candidate and then schedule it. * @return true if a candidate is chosen; false if no candidates is chosen. */ - private <C extends BalancerDatanode> boolean chooseForOneDatanode( - BalancerDatanode dn, Collection<C> candidates, Matcher matcher) { + private <C extends StorageGroup> boolean choose4One(StorageGroup g, + Collection<C> candidates, Matcher matcher) { final Iterator<C> i = candidates.iterator(); - final C chosen = chooseCandidate(dn, i, matcher); - + final C chosen = chooseCandidate(g, i, matcher); + if (chosen == null) { return false; } - if (dn instanceof Source) { - matchSourceWithTargetToMove((Source)dn, chosen); + if (g instanceof Source) { + matchSourceWithTargetToMove((Source)g, chosen); } else { - matchSourceWithTargetToMove((Source)chosen, dn); + matchSourceWithTargetToMove((Source)chosen, g); } if (!chosen.hasSpaceForScheduling()) { i.remove(); @@ -1083,27 +431,26 @@ public class Balancer { return true; } - private void matchSourceWithTargetToMove( - Source source, BalancerDatanode target) { + private void matchSourceWithTargetToMove(Source source, StorageGroup target) { long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); - NodeTask nodeTask = new NodeTask(target, size); - source.addNodeTask(nodeTask); - target.incScheduledSize(nodeTask.getSize()); - sources.add(source); - targets.add(target); + final Task task = new Task(target, size); + source.addTask(task); + target.incScheduledSize(task.getSize()); + dispatcher.add(source, target); LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " - +source.datanode.getName() + " to " + target.datanode.getName()); + + source.getDisplayName() + " to " + target.getDisplayName()); } /** Choose a candidate for the given datanode. */ - private <D extends BalancerDatanode, C extends BalancerDatanode> - C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) { - if (dn.hasSpaceForScheduling()) { + private <G extends StorageGroup, C extends StorageGroup> + C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) { + if (g.hasSpaceForScheduling()) { for(; candidates.hasNext(); ) { final C c = candidates.next(); if (!c.hasSpaceForScheduling()) { candidates.remove(); - } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) { + } else if (matcher.match(dispatcher.getCluster(), + g.getDatanodeInfo(), c.getDatanodeInfo())) { return c; } } @@ -1111,283 +458,25 @@ public class Balancer { return null; } - private static class BytesMoved { - private long bytesMoved = 0L;; - private synchronized void inc( long bytes ) { - bytesMoved += bytes; - } - - private synchronized long get() { - return bytesMoved; - } - }; - private final BytesMoved bytesMoved = new BytesMoved(); - - /* Start a thread to dispatch block moves for each source. - * The thread selects blocks to move & sends request to proxy source to - * initiate block move. The process is flow controlled. Block selection is - * blocked if there are too many un-confirmed block moves. - * Return the total number of bytes successfully moved in this iteration. - */ - private long dispatchBlockMoves() throws InterruptedException { - long bytesLastMoved = bytesMoved.get(); - Future<?>[] futures = new Future<?>[sources.size()]; - int i=0; - for (Source source : sources) { - futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher()); - } - - // wait for all dispatcher threads to finish - for (Future<?> future : futures) { - try { - future.get(); - } catch (ExecutionException e) { - LOG.warn("Dispatcher thread failed", e.getCause()); - } - } - - // wait for all block moving to be done - waitForMoveCompletion(); - - return bytesMoved.get()-bytesLastMoved; - } - - // The sleeping period before checking if block move is completed again - static private long blockMoveWaitTime = 30000L; - - /** set the sleeping period for block move completion check */ - static void setBlockMoveWaitTime(long time) { - blockMoveWaitTime = time; - } - - /* wait for all block move confirmations - * by checking each target's pendingMove queue - */ - private void waitForMoveCompletion() { - boolean shouldWait; - do { - shouldWait = false; - for (BalancerDatanode target : targets) { - if (!target.isPendingQEmpty()) { - shouldWait = true; - } - } - if (shouldWait) { - try { - Thread.sleep(blockMoveWaitTime); - } catch (InterruptedException ignored) { - } - } - } while (shouldWait); - } - - /** This window makes sure to keep blocks that have been moved within 1.5 hour. - * Old window has blocks that are older; - * Current window has blocks that are more recent; - * Cleanup method triggers the check if blocks in the old window are - * more than 1.5 hour old. If yes, purge the old window and then - * move blocks in current window to old window. - */ - private static class MovedBlocks { - private long lastCleanupTime = Time.now(); - final private static int CUR_WIN = 0; - final private static int OLD_WIN = 1; - final private static int NUM_WINS = 2; - final private List<HashMap<Block, BalancerBlock>> movedBlocks = - new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS); - - /* initialize the moved blocks collection */ - private MovedBlocks() { - movedBlocks.add(new HashMap<Block,BalancerBlock>()); - movedBlocks.add(new HashMap<Block,BalancerBlock>()); - } - - /* add a block thus marking a block to be moved */ - synchronized private void add(BalancerBlock block) { - movedBlocks.get(CUR_WIN).put(block.getBlock(), block); - } - - /* check if a block is marked as moved */ - synchronized private boolean contains(BalancerBlock block) { - return contains(block.getBlock()); - } - - /* check if a block is marked as moved */ - synchronized private boolean contains(Block block) { - return movedBlocks.get(CUR_WIN).containsKey(block) || - movedBlocks.get(OLD_WIN).containsKey(block); - } - - /* remove old blocks */ - synchronized private void cleanup() { - long curTime = Time.now(); - // check if old win is older than winWidth - if (lastCleanupTime + WIN_WIDTH <= curTime) { - // purge the old window - movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN)); - movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>()); - lastCleanupTime = curTime; - } - } - } - - /* Decide if it is OK to move the given block from source to target - * A block is a good candidate if - * 1. the block is not in the process of being moved/has not been moved; - * 2. the block does not have a replica on the target; - * 3. doing the move does not reduce the number of racks that the block has - */ - private boolean isGoodBlockCandidate(Source source, - BalancerDatanode target, BalancerBlock block) { - // check if the block is moved or not - if (movedBlocks.contains(block)) { - return false; - } - if (block.isLocatedOnDatanode(target)) { - return false; - } - if (cluster.isNodeGroupAware() && - isOnSameNodeGroupWithReplicas(target, block, source)) { - return false; - } - - boolean goodBlock = false; - if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) { - // good if source and target are on the same rack - goodBlock = true; - } else { - boolean notOnSameRack = true; - synchronized (block) { - for (BalancerDatanode loc : block.locations) { - if (cluster.isOnSameRack(loc.datanode, target.datanode)) { - notOnSameRack = false; - break; - } - } - } - if (notOnSameRack) { - // good if target is target is not on the same rack as any replica - goodBlock = true; - } else { - // good if source is on the same rack as on of the replicas - for (BalancerDatanode loc : block.locations) { - if (loc != source && - cluster.isOnSameRack(loc.datanode, source.datanode)) { - goodBlock = true; - break; - } - } - } - } - return goodBlock; - } - - /** - * Check if there are any replica (other than source) on the same node group - * with target. If true, then target is not a good candidate for placing - * specific block replica as we don't want 2 replicas under the same nodegroup - * after balance. - * @param target targetDataNode - * @param block dataBlock - * @param source sourceDataNode - * @return true if there are any replica (other than source) on the same node - * group with target - */ - private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode target, - BalancerBlock block, Source source) { - for (BalancerDatanode loc : block.locations) { - if (loc != source && - cluster.isOnSameNodeGroup(loc.getDatanode(), target.getDatanode())) { - return true; - } - } - return false; - } - /* reset all fields in a balancer preparing for the next iteration */ private void resetData(Configuration conf) { - this.cluster = NetworkTopology.getInstance(conf); - this.overUtilizedDatanodes.clear(); - this.aboveAvgUtilizedDatanodes.clear(); - this.belowAvgUtilizedDatanodes.clear(); - this.underUtilizedDatanodes.clear(); - this.datanodeMap.clear(); - this.sources.clear(); - this.targets.clear(); + this.overUtilized.clear(); + this.aboveAvgUtilized.clear(); + this.belowAvgUtilized.clear(); + this.underUtilized.clear(); this.policy.reset(); - cleanGlobalBlockList(); - this.movedBlocks.cleanup(); - } - - /* Remove all blocks from the global block list except for the ones in the - * moved list. - */ - private void cleanGlobalBlockList() { - for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator(); - globalBlockListIterator.hasNext();) { - Block block = globalBlockListIterator.next(); - if(!movedBlocks.contains(block)) { - globalBlockListIterator.remove(); - } - } - } - - /* Return true if the given datanode is overUtilized */ - private boolean isOverUtilized(BalancerDatanode datanode) { - return datanode.utilization > (policy.getAvgUtilization()+threshold); - } - - /* Return true if the given datanode is above or equal to average utilized - * but not overUtilized */ - private boolean isAboveAvgUtilized(BalancerDatanode datanode) { - final double avg = policy.getAvgUtilization(); - return (datanode.utilization <= (avg+threshold)) - && (datanode.utilization >= avg); + dispatcher.reset(conf);; } - /* Return true if the given datanode is underUtilized */ - private boolean isUnderUtilized(BalancerDatanode datanode) { - return datanode.utilization < (policy.getAvgUtilization()-threshold); - } - - /* Return true if the given datanode is below average utilized - * but not underUtilized */ - private boolean isBelowOrEqualAvgUtilized(BalancerDatanode datanode) { - final double avg = policy.getAvgUtilization(); - return (datanode.utilization >= (avg-threshold)) - && (datanode.utilization <= avg); - } - - // Exit status - enum ReturnStatus { - // These int values will map directly to the balancer process's exit code. - SUCCESS(0), - IN_PROGRESS(1), - ALREADY_RUNNING(-1), - NO_MOVE_BLOCK(-2), - NO_MOVE_PROGRESS(-3), - IO_EXCEPTION(-4), - ILLEGAL_ARGS(-5), - INTERRUPTED(-6); - - final int code; - - ReturnStatus(int code) { - this.code = code; - } - } - /** Run an iteration for all datanodes. */ - private ReturnStatus run(int iteration, Formatter formatter, + private ExitStatus run(int iteration, Formatter formatter, Configuration conf) { try { - /* get all live datanodes of a cluster and their disk usage - * decide the number of bytes need to be moved - */ - final long bytesLeftToMove = initNodes(nnc.client.getDatanodeReport(DatanodeReportType.LIVE)); + final List<DatanodeStorageReport> reports = dispatcher.init(); + final long bytesLeftToMove = init(reports); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); - return ReturnStatus.SUCCESS; + return ExitStatus.SUCCESS; } else { LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) + " to make the cluster balanced." ); @@ -1398,10 +487,10 @@ public class Balancer { * in this iteration. Maximum bytes to be moved per node is * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). */ - final long bytesToMove = chooseNodes(); + final long bytesToMove = chooseStorageGroups(); if (bytesToMove == 0) { System.out.println("No block can be moved. Exiting..."); - return ReturnStatus.NO_MOVE_BLOCK; + return ExitStatus.NO_MOVE_BLOCK; } else { LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + " in this iteration"); @@ -1410,7 +499,7 @@ public class Balancer { formatter.format("%-24s %10d %19s %18s %17s%n", DateFormat.getDateTimeInstance().format(new Date()), iteration, - StringUtils.byteDesc(bytesMoved.get()), + StringUtils.byteDesc(dispatcher.getBytesMoved()), StringUtils.byteDesc(bytesLeftToMove), StringUtils.byteDesc(bytesToMove) ); @@ -1421,24 +510,22 @@ public class Balancer { * available to move. * Exit no byte has been moved for 5 consecutive iterations. */ - if (!this.nnc.shouldContinue(dispatchBlockMoves())) { - return ReturnStatus.NO_MOVE_PROGRESS; + if (!dispatcher.dispatchAndCheckContinue()) { + return ExitStatus.NO_MOVE_PROGRESS; } - return ReturnStatus.IN_PROGRESS; + return ExitStatus.IN_PROGRESS; } catch (IllegalArgumentException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.ILLEGAL_ARGS; + return ExitStatus.ILLEGAL_ARGUMENTS; } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.IO_EXCEPTION; + return ExitStatus.IO_EXCEPTION; } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.INTERRUPTED; + return ExitStatus.INTERRUPTED; } finally { - // shutdown thread pools - dispatcherExecutor.shutdownNow(); - moverExecutor.shutdownNow(); + dispatcher.shutdownNow(); } } @@ -1453,8 +540,8 @@ public class Balancer { final long sleeptime = 2000*conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); - LOG.info("namenodes = " + namenodes); - LOG.info("p = " + p); + LOG.info("namenodes = " + namenodes); + LOG.info("parameters = " + p); final Formatter formatter = new Formatter(System.out); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); @@ -1463,7 +550,10 @@ public class Balancer { = new ArrayList<NameNodeConnector>(namenodes.size()); try { for (URI uri : namenodes) { - connectors.add(new NameNodeConnector(uri, conf)); + final NameNodeConnector nnc = new NameNodeConnector( + Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf); + nnc.getKeyManager().startBlockKeyUpdater(); + connectors.add(nnc); } boolean done = false; @@ -1472,14 +562,14 @@ public class Balancer { Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { final Balancer b = new Balancer(nnc, p, conf); - final ReturnStatus r = b.run(iteration, formatter, conf); + final ExitStatus r = b.run(iteration, formatter, conf); // clean all lists b.resetData(conf); - if (r == ReturnStatus.IN_PROGRESS) { + if (r == ExitStatus.IN_PROGRESS) { done = false; - } else if (r != ReturnStatus.SUCCESS) { + } else if (r != ExitStatus.SUCCESS) { //must be an error statue, return. - return r.code; + return r.getExitCode(); } } @@ -1492,7 +582,7 @@ public class Balancer { nnc.close(); } } - return ReturnStatus.SUCCESS.code; + return ExitStatus.SUCCESS.getExitCode(); } /* Given elaspedTime in ms, return a printable string */ @@ -1516,21 +606,31 @@ public class Balancer { } static class Parameters { - static final Parameters DEFALUT = new Parameters( - BalancingPolicy.Node.INSTANCE, 10.0); + static final Parameters DEFAULT = new Parameters( + BalancingPolicy.Node.INSTANCE, 10.0, + Collections.<String> emptySet(), Collections.<String> emptySet()); final BalancingPolicy policy; final double threshold; + // exclude the nodes in this set from balancing operations + Set<String> nodesToBeExcluded; + //include only these nodes in balancing operations + Set<String> nodesToBeIncluded; - Parameters(BalancingPolicy policy, double threshold) { + Parameters(BalancingPolicy policy, double threshold, + Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) { this.policy = policy; this.threshold = threshold; + this.nodesToBeExcluded = nodesToBeExcluded; + this.nodesToBeIncluded = nodesToBeIncluded; } @Override public String toString() { return Balancer.class.getSimpleName() + "." + getClass().getSimpleName() - + "[" + policy + ", threshold=" + threshold + "]"; + + "[" + policy + ", threshold=" + threshold + + ", number of nodes to be excluded = "+ nodesToBeExcluded.size() + + ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]"; } } @@ -1545,9 +645,6 @@ public class Balancer { public int run(String[] args) { final long startTime = Time.now(); final Configuration conf = getConf(); - WIN_WIDTH = conf.getLong( - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); try { checkReplicationPolicyCompatibility(conf); @@ -1556,26 +653,29 @@ public class Balancer { return Balancer.run(namenodes, parse(args), conf); } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.IO_EXCEPTION.code; + return ExitStatus.IO_EXCEPTION.getExitCode(); } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.INTERRUPTED.code; + return ExitStatus.INTERRUPTED.getExitCode(); } finally { + System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); System.out.println("Balancing took " + time2Str(Time.now()-startTime)); } } /** parse command line arguments */ static Parameters parse(String[] args) { - BalancingPolicy policy = Parameters.DEFALUT.policy; - double threshold = Parameters.DEFALUT.threshold; + BalancingPolicy policy = Parameters.DEFAULT.policy; + double threshold = Parameters.DEFAULT.threshold; + Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; + Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; if (args != null) { try { for(int i = 0; i < args.length; i++) { - checkArgument(args.length >= 2, "args = " + Arrays.toString(args)); if ("-threshold".equalsIgnoreCase(args[i])) { - i++; + checkArgument(++i < args.length, + "Threshold value is missing: args = " + Arrays.toString(args)); try { threshold = Double.parseDouble(args[i]); if (threshold < 1 || threshold > 100) { @@ -1590,25 +690,52 @@ public class Balancer { throw e; } } else if ("-policy".equalsIgnoreCase(args[i])) { - i++; + checkArgument(++i < args.length, + "Policy value is missing: args = " + Arrays.toString(args)); try { policy = BalancingPolicy.parse(args[i]); } catch(IllegalArgumentException e) { System.err.println("Illegal policy name: " + args[i]); throw e; } + } else if ("-exclude".equalsIgnoreCase(args[i])) { + checkArgument(++i < args.length, + "List of nodes to exclude | -f <filename> is missing: args = " + + Arrays.toString(args)); + if ("-f".equalsIgnoreCase(args[i])) { + checkArgument(++i < args.length, + "File containing nodes to exclude is not specified: args = " + + Arrays.toString(args)); + nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude"); + } else { + nodesTobeExcluded = Util.parseHostList(args[i]); + } + } else if ("-include".equalsIgnoreCase(args[i])) { + checkArgument(++i < args.length, + "List of nodes to include | -f <filename> is missing: args = " + + Arrays.toString(args)); + if ("-f".equalsIgnoreCase(args[i])) { + checkArgument(++i < args.length, + "File containing nodes to include is not specified: args = " + + Arrays.toString(args)); + nodesTobeIncluded = Util.getHostListFromFile(args[i], "include"); + } else { + nodesTobeIncluded = Util.parseHostList(args[i]); + } } else { throw new IllegalArgumentException("args = " + Arrays.toString(args)); } } + checkArgument(nodesTobeExcluded.isEmpty() || nodesTobeIncluded.isEmpty(), + "-exclude and -include options cannot be specified together."); } catch(RuntimeException e) { printUsage(System.err); throw e; } } - return new Parameters(policy, threshold); + return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded); } private static void printUsage(PrintStream out) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancingPolicy.java Tue Aug 19 23:49:39 2014 @@ -18,7 +18,11 @@ package org.apache.hadoop.hdfs.server.balancer; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.EnumCounters; +import org.apache.hadoop.hdfs.util.EnumDoubles; /** * Balancing policy. @@ -28,31 +32,43 @@ import org.apache.hadoop.hdfs.protocol.D */ @InterfaceAudience.Private abstract class BalancingPolicy { - long totalCapacity; - long totalUsedSpace; - private double avgUtilization; + final EnumCounters<StorageType> totalCapacities + = new EnumCounters<StorageType>(StorageType.class); + final EnumCounters<StorageType> totalUsedSpaces + = new EnumCounters<StorageType>(StorageType.class); + final EnumDoubles<StorageType> avgUtilizations + = new EnumDoubles<StorageType>(StorageType.class); void reset() { - totalCapacity = 0L; - totalUsedSpace = 0L; - avgUtilization = 0.0; + totalCapacities.reset(); + totalUsedSpaces.reset(); + avgUtilizations.reset(); } /** Get the policy name. */ abstract String getName(); /** Accumulate used space and capacity. */ - abstract void accumulateSpaces(DatanodeInfo d); + abstract void accumulateSpaces(DatanodeStorageReport r); void initAvgUtilization() { - this.avgUtilization = totalUsedSpace*100.0/totalCapacity; + for(StorageType t : StorageType.asList()) { + final long capacity = totalCapacities.get(t); + if (capacity > 0L) { + final double avg = totalUsedSpaces.get(t)*100.0/capacity; + avgUtilizations.set(t, avg); + } + } } - double getAvgUtilization() { - return avgUtilization; + + double getAvgUtilization(StorageType t) { + return avgUtilizations.get(t); } - /** Return the utilization of a datanode */ - abstract double getUtilization(DatanodeInfo d); + /** @return the utilization of a particular storage type of a datanode; + * or return null if the datanode does not have such storage type. + */ + abstract Double getUtilization(DatanodeStorageReport r, StorageType t); @Override public String toString() { @@ -84,14 +100,25 @@ abstract class BalancingPolicy { } @Override - void accumulateSpaces(DatanodeInfo d) { - totalCapacity += d.getCapacity(); - totalUsedSpace += d.getDfsUsed(); + void accumulateSpaces(DatanodeStorageReport r) { + for(StorageReport s : r.getStorageReports()) { + final StorageType t = s.getStorage().getStorageType(); + totalCapacities.add(t, s.getCapacity()); + totalUsedSpaces.add(t, s.getDfsUsed()); + } } @Override - double getUtilization(DatanodeInfo d) { - return d.getDfsUsed()*100.0/d.getCapacity(); + Double getUtilization(DatanodeStorageReport r, final StorageType t) { + long capacity = 0L; + long dfsUsed = 0L; + for(StorageReport s : r.getStorageReports()) { + if (s.getStorage().getStorageType() == t) { + capacity += s.getCapacity(); + dfsUsed += s.getDfsUsed(); + } + } + return capacity == 0L? null: dfsUsed*100.0/capacity; } } @@ -108,14 +135,25 @@ abstract class BalancingPolicy { } @Override - void accumulateSpaces(DatanodeInfo d) { - totalCapacity += d.getCapacity(); - totalUsedSpace += d.getBlockPoolUsed(); + void accumulateSpaces(DatanodeStorageReport r) { + for(StorageReport s : r.getStorageReports()) { + final StorageType t = s.getStorage().getStorageType(); + totalCapacities.add(t, s.getCapacity()); + totalUsedSpaces.add(t, s.getBlockPoolUsed()); + } } @Override - double getUtilization(DatanodeInfo d) { - return d.getBlockPoolUsed()*100.0/d.getCapacity(); + Double getUtilization(DatanodeStorageReport r, final StorageType t) { + long capacity = 0L; + long blockPoolUsed = 0L; + for(StorageReport s : r.getStorageReports()) { + if (s.getStorage().getStorageType() == t) { + capacity += s.getCapacity(); + blockPoolUsed += s.getBlockPoolUsed(); + } + } + return capacity == 0L? null: blockPoolUsed*100.0/capacity; } } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Tue Aug 19 23:49:39 2014 @@ -17,115 +17,102 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import java.io.Closeable; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; -import java.util.EnumSet; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Daemon; /** - * The class provides utilities for {@link Balancer} to access a NameNode + * The class provides utilities for accessing a NameNode. */ @InterfaceAudience.Private -class NameNodeConnector { - private static final Log LOG = Balancer.LOG; - private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); +public class NameNodeConnector implements Closeable { + private static final Log LOG = LogFactory.getLog(NameNodeConnector.class); + private static final int MAX_NOT_CHANGED_ITERATIONS = 5; - final URI nameNodeUri; - final String blockpoolID; + private final URI nameNodeUri; + private final String blockpoolID; + + private final NamenodeProtocol namenode; + private final ClientProtocol client; + private final KeyManager keyManager; + + private final FileSystem fs; + private final Path idPath; + private final OutputStream out; - final NamenodeProtocol namenode; - final ClientProtocol client; - final FileSystem fs; - final OutputStream out; - - private final boolean isBlockTokenEnabled; - private final boolean encryptDataTransfer; - private boolean shouldRun; - private long keyUpdaterInterval; - // used for balancer private int notChangedIterations = 0; - private BlockTokenSecretManager blockTokenSecretManager; - private Daemon keyupdaterthread; // AccessKeyUpdater thread - private DataEncryptionKey encryptionKey; - private final TrustedChannelResolver trustedChannelResolver; - NameNodeConnector(URI nameNodeUri, + public NameNodeConnector(String name, URI nameNodeUri, Path idPath, Configuration conf) throws IOException { this.nameNodeUri = nameNodeUri; + this.idPath = idPath; - this.namenode = - NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class) - .getProxy(); - this.client = - NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class) - .getProxy(); + this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, + NamenodeProtocol.class).getProxy(); + this.client = NameNodeProxies.createProxy(conf, nameNodeUri, + ClientProtocol.class).getProxy(); this.fs = FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); this.blockpoolID = namespaceinfo.getBlockPoolID(); - final ExportedBlockKeys keys = namenode.getBlockKeys(); - this.isBlockTokenEnabled = keys.isBlockTokenEnabled(); - if (isBlockTokenEnabled) { - long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); - long blockTokenLifetime = keys.getTokenLifetime(); - LOG.info("Block token params received from NN: keyUpdateInterval=" - + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" - + blockTokenLifetime / (60 * 1000) + " min(s)"); - String encryptionAlgorithm = conf.get( - DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); - this.blockTokenSecretManager = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, blockpoolID, - encryptionAlgorithm); - this.blockTokenSecretManager.addKeys(keys); - /* - * Balancer should sync its block keys with NN more frequently than NN - * updates its block keys - */ - this.keyUpdaterInterval = blockKeyUpdateInterval / 4; - LOG.info("Balancer will update its block keys every " - + keyUpdaterInterval / (60 * 1000) + " minute(s)"); - this.keyupdaterthread = new Daemon(new BlockKeyUpdater()); - this.shouldRun = true; - this.keyupdaterthread.start(); - } - this.encryptDataTransfer = fs.getServerDefaults(new Path("/")) - .getEncryptDataTransfer(); - // Check if there is another balancer running. + final FsServerDefaults defaults = fs.getServerDefaults(new Path("/")); + this.keyManager = new KeyManager(blockpoolID, namenode, + defaults.getEncryptDataTransfer(), conf); // Exit if there is another one running. - out = checkAndMarkRunningBalancer(); + out = checkAndMarkRunning(); if (out == null) { - throw new IOException("Another balancer is running"); + throw new IOException("Another " + name + " is running."); } - this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); } - boolean shouldContinue(long dispatchBlockMoveBytes) { + /** @return the block pool ID */ + public String getBlockpoolID() { + return blockpoolID; + } + + /** @return blocks with locations. */ + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + return namenode.getBlocks(datanode, size); + } + + /** @return live datanode storage reports. */ + public DatanodeStorageReport[] getLiveDatanodeStorageReport() + throws IOException { + return client.getDatanodeStorageReport(DatanodeReportType.LIVE); + } + + /** @return the key manager */ + public KeyManager getKeyManager() { + return keyManager; + } + + /** Should the instance continue running? */ + public boolean shouldContinue(long dispatchBlockMoveBytes) { if (dispatchBlockMoveBytes > 0) { notChangedIterations = 0; } else { @@ -139,53 +126,25 @@ class NameNodeConnector { return true; } - /** Get an access token for a block. */ - Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb - ) throws IOException { - if (!isBlockTokenEnabled) { - return BlockTokenSecretManager.DUMMY_TOKEN; - } else { - if (!shouldRun) { - throw new IOException( - "Can not get access token. BlockKeyUpdater is not running"); - } - return blockTokenSecretManager.generateToken(null, eb, - EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE, - BlockTokenSecretManager.AccessMode.COPY)); - } - } - - DataEncryptionKey getDataEncryptionKey() - throws IOException { - if (encryptDataTransfer && !this.trustedChannelResolver.isTrusted()) { - synchronized (this) { - if (encryptionKey == null) { - encryptionKey = blockTokenSecretManager.generateDataEncryptionKey(); - } - return encryptionKey; - } - } else { - return null; - } - } - /* The idea for making sure that there is no more than one balancer - * running in an HDFS is to create a file in the HDFS, writes the IP address - * of the machine on which the balancer is running to the file, but did not - * close the file until the balancer exits. - * This prevents the second balancer from running because it can not + /** + * The idea for making sure that there is no more than one instance + * running in an HDFS is to create a file in the HDFS, writes the hostname + * of the machine on which the instance is running to the file, but did not + * close the file until it exits. + * + * This prevents the second instance from running because it can not * creates the file while the first one is running. * - * This method checks if there is any running balancer and - * if no, mark yes if no. + * This method checks if there is any running instance. If no, mark yes. * Note that this is an atomic operation. * - * Return null if there is a running balancer; otherwise the output stream - * to the newly created file. + * @return null if there is a running instance; + * otherwise, the output stream to the newly created file. */ - private OutputStream checkAndMarkRunningBalancer() throws IOException { + private OutputStream checkAndMarkRunning() throws IOException { try { - final DataOutputStream out = fs.create(BALANCER_ID_PATH); + final DataOutputStream out = fs.create(idPath); out.writeBytes(InetAddress.getLocalHost().getHostName()); out.flush(); return out; @@ -198,24 +157,17 @@ class NameNodeConnector { } } - /** Close the connection. */ - void close() { - shouldRun = false; - try { - if (keyupdaterthread != null) { - keyupdaterthread.interrupt(); - } - } catch(Exception e) { - LOG.warn("Exception shutting down access key updater thread", e); - } + @Override + public void close() { + keyManager.close(); // close the output file IOUtils.closeStream(out); if (fs != null) { try { - fs.delete(BALANCER_ID_PATH, true); + fs.delete(idPath, true); } catch(IOException ioe) { - LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe); + LOG.warn("Failed to delete " + idPath, ioe); } } } @@ -223,31 +175,6 @@ class NameNodeConnector { @Override public String toString() { return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri - + ", id=" + blockpoolID - + "]"; - } - - /** - * Periodically updates access keys. - */ - class BlockKeyUpdater implements Runnable { - @Override - public void run() { - try { - while (shouldRun) { - try { - blockTokenSecretManager.addKeys(namenode.getBlockKeys()); - } catch (IOException e) { - LOG.error("Failed to set keys", e); - } - Thread.sleep(keyUpdaterInterval); - } - } catch (InterruptedException e) { - LOG.debug("InterruptedException in block key updater thread", e); - } catch (Throwable e) { - LOG.error("Exception in block key updater thread", e); - shouldRun = false; - } - } + + ", bpid=" + blockpoolID + "]"; } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Tue Aug 19 23:49:39 2014 @@ -21,7 +21,6 @@ import java.util.LinkedList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.util.LightWeightGSet; @@ -195,24 +194,12 @@ public class BlockInfo extends Block imp * Add a {@link DatanodeStorageInfo} location for a block */ boolean addStorage(DatanodeStorageInfo storage) { - boolean added = true; - int idx = findDatanode(storage.getDatanodeDescriptor()); - if(idx >= 0) { - if (getStorageInfo(idx) == storage) { // the storage is already there - return false; - } else { - // The block is on the DN but belongs to a different storage. - // Update our state. - removeStorage(storage); - added = false; // Just updating storage. Return false. - } - } // find the last null node int lastNode = ensureCapacity(1); setStorageInfo(lastNode, storage); setNext(lastNode, null); setPrevious(lastNode, null); - return added; + return true; } /** @@ -239,40 +226,39 @@ public class BlockInfo extends Block imp /** * Find specified DatanodeDescriptor. - * @param dn * @return index or -1 if not found. */ - int findDatanode(DatanodeDescriptor dn) { + boolean findDatanode(DatanodeDescriptor dn) { int len = getCapacity(); for(int idx = 0; idx < len; idx++) { DatanodeDescriptor cur = getDatanode(idx); - if(cur == dn) - return idx; - if(cur == null) + if(cur == dn) { + return true; + } + if(cur == null) { break; + } } - return -1; + return false; } /** * Find specified DatanodeStorageInfo. - * @param dn - * @return index or -1 if not found. + * @return DatanodeStorageInfo or null if not found. */ - int findStorageInfo(DatanodeInfo dn) { + DatanodeStorageInfo findStorageInfo(DatanodeDescriptor dn) { int len = getCapacity(); for(int idx = 0; idx < len; idx++) { DatanodeStorageInfo cur = getStorageInfo(idx); if(cur == null) break; if(cur.getDatanodeDescriptor() == dn) - return idx; + return cur; } - return -1; + return null; } /** * Find specified DatanodeStorageInfo. - * @param storageInfo * @return index or -1 if not found. */ int findStorageInfo(DatanodeStorageInfo storageInfo) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java Tue Aug 19 23:49:39 2014 @@ -373,12 +373,14 @@ public class BlockInfoUnderConstruction sb.append("{blockUCState=").append(blockUCState) .append(", primaryNodeIndex=").append(primaryNodeIndex) .append(", replicas=["); - Iterator<ReplicaUnderConstruction> iter = replicas.iterator(); - if (iter.hasNext()) { - iter.next().appendStringTo(sb); - while (iter.hasNext()) { - sb.append(", "); + if (replicas != null) { + Iterator<ReplicaUnderConstruction> iter = replicas.iterator(); + if (iter.hasNext()) { iter.next().appendStringTo(sb); + while (iter.hasNext()) { + sb.append(", "); + iter.next().appendStringTo(sb); + } } } sb.append("]}");