Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Tue Aug 12 17:02:07 2014 @@ -18,19 +18,9 @@ package org.apache.hadoop.hdfs.server.balancer; import static com.google.common.base.Preconditions.checkArgument; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; -import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; - -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; + import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.io.PrintStream; -import java.net.Socket; import java.net.URI; import java.text.DateFormat; import java.util.ArrayList; @@ -38,20 +28,11 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Date; -import java.util.EnumMap; import java.util.Formatter; -import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -63,31 +44,16 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.StorageType; -import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Task; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Util; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; -import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -200,15 +166,7 @@ public class Balancer { private static final long GB = 1L << 30; //1GB private static final long MAX_SIZE_TO_MOVE = 10*GB; - private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB; - /** The maximum number of concurrent blocks moves for - * balancing purpose at a datanode - */ - private static final int MAX_NO_PENDING_BLOCK_ITERATIONS = 5; - public static final long DELAY_AFTER_ERROR = 10 * 1000L; //10 seconds - public static final int BLOCK_MOVE_READ_TIMEOUT=20*60*1000; // 20 minutes - private static final String USAGE = "Usage: java " + Balancer.class.getSimpleName() + "\n\t[-policy <policy>]\tthe balancing policy: " @@ -220,652 +178,17 @@ public class Balancer { + "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]" + "\tIncludes only the specified datanodes."; - private final NameNodeConnector nnc; - private final KeyManager keyManager; - + private final Dispatcher dispatcher; private final BalancingPolicy policy; - private final SaslDataTransferClient saslClient; private final double threshold; - // set of data nodes to be excluded from balancing operations. - Set<String> nodesToBeExcluded; - //Restrict balancing to the following nodes. - Set<String> nodesToBeIncluded; // all data node lists private final Collection<Source> overUtilized = new LinkedList<Source>(); private final Collection<Source> aboveAvgUtilized = new LinkedList<Source>(); - private final Collection<BalancerDatanode.StorageGroup> belowAvgUtilized - = new LinkedList<BalancerDatanode.StorageGroup>(); - private final Collection<BalancerDatanode.StorageGroup> underUtilized - = new LinkedList<BalancerDatanode.StorageGroup>(); - - private final Collection<Source> sources = new HashSet<Source>(); - private final Collection<BalancerDatanode.StorageGroup> targets - = new HashSet<BalancerDatanode.StorageGroup>(); - - private final Map<Block, BalancerBlock> globalBlockList - = new HashMap<Block, BalancerBlock>(); - private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks; - - /** Map (datanodeUuid,storageType -> StorageGroup) */ - private final StorageGroupMap storageGroupMap = new StorageGroupMap(); - - private NetworkTopology cluster; - - private final ExecutorService moverExecutor; - private final ExecutorService dispatcherExecutor; - private final int maxConcurrentMovesPerNode; - - - private static class StorageGroupMap { - private static String toKey(String datanodeUuid, StorageType storageType) { - return datanodeUuid + ":" + storageType; - } - - private final Map<String, BalancerDatanode.StorageGroup> map - = new HashMap<String, BalancerDatanode.StorageGroup>(); - - BalancerDatanode.StorageGroup get(String datanodeUuid, StorageType storageType) { - return map.get(toKey(datanodeUuid, storageType)); - } - - void put(BalancerDatanode.StorageGroup g) { - final String key = toKey(g.getDatanode().getDatanodeUuid(), g.storageType); - final BalancerDatanode.StorageGroup existing = map.put(key, g); - Preconditions.checkState(existing == null); - } - - int size() { - return map.size(); - } - - void clear() { - map.clear(); - } - } - /* This class keeps track of a scheduled block move */ - private class PendingBlockMove { - private BalancerBlock block; - private Source source; - private BalancerDatanode proxySource; - private BalancerDatanode.StorageGroup target; - - /** constructor */ - private PendingBlockMove() { - } - - @Override - public String toString() { - final Block b = block.getBlock(); - return b + " with size=" + b.getNumBytes() + " from " - + source.getDisplayName() + " to " + target.getDisplayName() - + " through " + proxySource.datanode; - } - - /* choose a block & a proxy source for this pendingMove - * whose source & target have already been chosen. - * - * Return true if a block and its proxy are chosen; false otherwise - */ - private boolean chooseBlockAndProxy() { - // iterate all source's blocks until find a good one - for (Iterator<BalancerBlock> blocks= - source.getBlockIterator(); blocks.hasNext();) { - if (markMovedIfGoodBlock(blocks.next())) { - blocks.remove(); - return true; - } - } - return false; - } - - /* Return true if the given block is good for the tentative move; - * If it is good, add it to the moved list to marked as "Moved". - * A block is good if - * 1. it is a good candidate; see isGoodBlockCandidate - * 2. can find a proxy source that's not busy for this move - */ - private boolean markMovedIfGoodBlock(BalancerBlock block) { - synchronized(block) { - synchronized(movedBlocks) { - if (isGoodBlockCandidate(source, target, block)) { - this.block = block; - if ( chooseProxySource() ) { - movedBlocks.put(block); - if (LOG.isDebugEnabled()) { - LOG.debug("Decided to move " + this); - } - return true; - } - } - } - } - return false; - } - - /* Now we find out source, target, and block, we need to find a proxy - * - * @return true if a proxy is found; otherwise false - */ - private boolean chooseProxySource() { - final DatanodeInfo targetDN = target.getDatanode(); - // if node group is supported, first try add nodes in the same node group - if (cluster.isNodeGroupAware()) { - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameNodeGroup(loc.getDatanode(), targetDN) && addTo(loc)) { - return true; - } - } - } - // check if there is replica which is on the same rack with the target - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), targetDN) && addTo(loc)) { - return true; - } - } - // find out a non-busy replica - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (addTo(loc)) { - return true; - } - } - return false; - } - - /** add to a proxy source for specific block movement */ - private boolean addTo(BalancerDatanode.StorageGroup g) { - final BalancerDatanode bdn = g.getBalancerDatanode(); - if (bdn.addPendingBlock(this)) { - proxySource = bdn; - return true; - } - return false; - } - - /* Dispatch the block move task to the proxy source & wait for the response - */ - private void dispatch() { - Socket sock = new Socket(); - DataOutputStream out = null; - DataInputStream in = null; - try { - sock.connect( - NetUtils.createSocketAddr(target.getDatanode().getXferAddr()), - HdfsServerConstants.READ_TIMEOUT); - /* Unfortunately we don't have a good way to know if the Datanode is - * taking a really long time to move a block, OR something has - * gone wrong and it's never going to finish. To deal with this - * scenario, we set a long timeout (20 minutes) to avoid hanging - * the balancer indefinitely. - */ - sock.setSoTimeout(BLOCK_MOVE_READ_TIMEOUT); - - sock.setKeepAlive(true); - - OutputStream unbufOut = sock.getOutputStream(); - InputStream unbufIn = sock.getInputStream(); - ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock()); - Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb); - IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, keyManager, accessToken, target.getDatanode()); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - HdfsConstants.IO_FILE_BUFFER_SIZE)); - in = new DataInputStream(new BufferedInputStream(unbufIn, - HdfsConstants.IO_FILE_BUFFER_SIZE)); - - sendRequest(out, eb, StorageType.DEFAULT, accessToken); - receiveResponse(in); - bytesMoved.addAndGet(block.getNumBytes()); - LOG.info("Successfully moved " + this); - } catch (IOException e) { - LOG.warn("Failed to move " + this + ": " + e.getMessage()); - /* proxy or target may have an issue, insert a small delay - * before using these nodes further. This avoids a potential storm - * of "threads quota exceeded" Warnings when the balancer - * gets out of sync with work going on in datanode. - */ - proxySource.activateDelay(DELAY_AFTER_ERROR); - target.getBalancerDatanode().activateDelay(DELAY_AFTER_ERROR); - } finally { - IOUtils.closeStream(out); - IOUtils.closeStream(in); - IOUtils.closeSocket(sock); - - proxySource.removePendingBlock(this); - target.getBalancerDatanode().removePendingBlock(this); - - synchronized (this ) { - reset(); - } - synchronized (Balancer.this) { - Balancer.this.notifyAll(); - } - } - } - - /* Send a block replace request to the output stream*/ - private void sendRequest(DataOutputStream out, ExtendedBlock eb, - StorageType storageType, - Token<BlockTokenIdentifier> accessToken) throws IOException { - new Sender(out).replaceBlock(eb, storageType, accessToken, - source.getDatanode().getDatanodeUuid(), proxySource.datanode); - } - - /* Receive a block copy response from the input stream */ - private void receiveResponse(DataInputStream in) throws IOException { - BlockOpResponseProto response = BlockOpResponseProto.parseFrom( - vintPrefixed(in)); - if (response.getStatus() != Status.SUCCESS) { - if (response.getStatus() == Status.ERROR_ACCESS_TOKEN) - throw new IOException("block move failed due to access token error"); - throw new IOException("block move is failed: " + - response.getMessage()); - } - } - - /* reset the object */ - private void reset() { - block = null; - source = null; - proxySource = null; - target = null; - } - - /* start a thread to dispatch the block move */ - private void scheduleBlockMove() { - moverExecutor.execute(new Runnable() { - @Override - public void run() { - if (LOG.isDebugEnabled()) { - LOG.debug("Start moving " + PendingBlockMove.this); - } - dispatch(); - } - }); - } - } - - /* A class for keeping track of blocks in the Balancer */ - static class BalancerBlock extends MovedBlocks.Locations<BalancerDatanode.StorageGroup> { - BalancerBlock(Block block) { - super(block); - } - } - - /* The class represents a desired move of bytes between two nodes - * and the target. - * An object of this class is stored in a source. - */ - static private class Task { - private final BalancerDatanode.StorageGroup target; - private long size; //bytes scheduled to move - - /* constructor */ - private Task(BalancerDatanode.StorageGroup target, long size) { - this.target = target; - this.size = size; - } - } - - - /* A class that keeps track of a datanode in Balancer */ - private static class BalancerDatanode { - - /** A group of storages in a datanode with the same storage type. */ - private class StorageGroup { - final StorageType storageType; - final double utilization; - final long maxSize2Move; - private long scheduledSize = 0L; - - private StorageGroup(StorageType storageType, double utilization, - long maxSize2Move) { - this.storageType = storageType; - this.utilization = utilization; - this.maxSize2Move = maxSize2Move; - } - - BalancerDatanode getBalancerDatanode() { - return BalancerDatanode.this; - } - - DatanodeInfo getDatanode() { - return BalancerDatanode.this.datanode; - } - - /** Decide if still need to move more bytes */ - protected synchronized boolean hasSpaceForScheduling() { - return availableSizeToMove() > 0L; - } - - /** @return the total number of bytes that need to be moved */ - synchronized long availableSizeToMove() { - return maxSize2Move - scheduledSize; - } - - /** increment scheduled size */ - synchronized void incScheduledSize(long size) { - scheduledSize += size; - } - - /** @return scheduled size */ - synchronized long getScheduledSize() { - return scheduledSize; - } - - /** Reset scheduled size to zero. */ - synchronized void resetScheduledSize() { - scheduledSize = 0L; - } - - /** @return the name for display */ - String getDisplayName() { - return datanode + ":" + storageType; - } - - @Override - public String toString() { - return "" + utilization; - } - } - - final DatanodeInfo datanode; - final EnumMap<StorageType, StorageGroup> storageMap - = new EnumMap<StorageType, StorageGroup>(StorageType.class); - protected long delayUntil = 0L; - // blocks being moved but not confirmed yet - private final List<PendingBlockMove> pendingBlocks; - private final int maxConcurrentMoves; - - @Override - public String toString() { - return getClass().getSimpleName() + ":" + datanode + ":" + storageMap; - } - - /* Constructor - * Depending on avgutil & threshold, calculate maximum bytes to move - */ - private BalancerDatanode(DatanodeStorageReport report, - double threshold, int maxConcurrentMoves) { - this.datanode = report.getDatanodeInfo(); - this.maxConcurrentMoves = maxConcurrentMoves; - this.pendingBlocks = new ArrayList<PendingBlockMove>(maxConcurrentMoves); - } - - private void put(StorageType storageType, StorageGroup g) { - final StorageGroup existing = storageMap.put(storageType, g); - Preconditions.checkState(existing == null); - } - - StorageGroup addStorageGroup(StorageType storageType, double utilization, - long maxSize2Move) { - final StorageGroup g = new StorageGroup(storageType, utilization, - maxSize2Move); - put(storageType, g); - return g; - } - - Source addSource(StorageType storageType, double utilization, - long maxSize2Move, Balancer balancer) { - final Source s = balancer.new Source(storageType, utilization, - maxSize2Move, this); - put(storageType, s); - return s; - } - - synchronized private void activateDelay(long delta) { - delayUntil = Time.now() + delta; - } - - synchronized private boolean isDelayActive() { - if (delayUntil == 0 || Time.now() > delayUntil){ - delayUntil = 0; - return false; - } - return true; - } - - /* Check if the node can schedule more blocks to move */ - synchronized private boolean isPendingQNotFull() { - if ( pendingBlocks.size() < this.maxConcurrentMoves ) { - return true; - } - return false; - } - - /* Check if all the dispatched moves are done */ - synchronized private boolean isPendingQEmpty() { - return pendingBlocks.isEmpty(); - } - - /* Add a scheduled block move to the node */ - private synchronized boolean addPendingBlock( - PendingBlockMove pendingBlock) { - if (!isDelayActive() && isPendingQNotFull()) { - return pendingBlocks.add(pendingBlock); - } - return false; - } - - /* Remove a scheduled block move from the node */ - private synchronized boolean removePendingBlock( - PendingBlockMove pendingBlock) { - return pendingBlocks.remove(pendingBlock); - } - } - - /** A node that can be the sources of a block move */ - private class Source extends BalancerDatanode.StorageGroup { - - /* A thread that initiates a block move - * and waits for block move to complete */ - private class BlockMoveDispatcher implements Runnable { - @Override - public void run() { - dispatchBlocks(); - } - } - - private final List<Task> tasks = new ArrayList<Task>(2); - private long blocksToReceive = 0L; - /* source blocks point to balancerBlocks in the global list because - * we want to keep one copy of a block in balancer and be aware that - * the locations are changing over time. - */ - private final List<BalancerBlock> srcBlockList - = new ArrayList<BalancerBlock>(); - - /* constructor */ - private Source(StorageType storageType, double utilization, - long maxSize2Move, BalancerDatanode dn) { - dn.super(storageType, utilization, maxSize2Move); - } - - /** Add a task */ - private void addTask(Task task) { - Preconditions.checkState(task.target != this, - "Source and target are the same storage group " + getDisplayName()); - incScheduledSize(task.size); - tasks.add(task); - } - - /* Return an iterator to this source's blocks */ - private Iterator<BalancerBlock> getBlockIterator() { - return srcBlockList.iterator(); - } - - /* fetch new blocks of this source from namenode and - * update this source's block list & the global block list - * Return the total size of the received blocks in the number of bytes. - */ - private long getBlockList() throws IOException { - final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); - final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks( - getDatanode(), size).getBlocks(); - - long bytesReceived = 0; - for (BlockWithLocations blk : newBlocks) { - bytesReceived += blk.getBlock().getNumBytes(); - BalancerBlock block; - synchronized(globalBlockList) { - block = globalBlockList.get(blk.getBlock()); - if (block==null) { - block = new BalancerBlock(blk.getBlock()); - globalBlockList.put(blk.getBlock(), block); - } else { - block.clearLocations(); - } - - synchronized (block) { - // update locations - final String[] datanodeUuids = blk.getDatanodeUuids(); - final StorageType[] storageTypes = blk.getStorageTypes(); - for (int i = 0; i < datanodeUuids.length; i++) { - final BalancerDatanode.StorageGroup g = storageGroupMap.get( - datanodeUuids[i], storageTypes[i]); - if (g != null) { // not unknown - block.addLocation(g); - } - } - } - if (!srcBlockList.contains(block) && isGoodBlockCandidate(block)) { - // filter bad candidates - srcBlockList.add(block); - } - } - } - return bytesReceived; - } - - /* Decide if the given block is a good candidate to move or not */ - private boolean isGoodBlockCandidate(BalancerBlock block) { - for (Task t : tasks) { - if (Balancer.this.isGoodBlockCandidate(this, t.target, block)) { - return true; - } - } - return false; - } - - /* Return a block that's good for the source thread to dispatch immediately - * The block's source, target, and proxy source are determined too. - * When choosing proxy and target, source & target throttling - * has been considered. They are chosen only when they have the capacity - * to support this block move. - * The block should be dispatched immediately after this method is returned. - */ - private PendingBlockMove chooseNextBlockToMove() { - for (Iterator<Task> i = tasks.iterator(); i.hasNext();) { - final Task task = i.next(); - final BalancerDatanode target = task.target.getBalancerDatanode(); - PendingBlockMove pendingBlock = new PendingBlockMove(); - if (target.addPendingBlock(pendingBlock)) { - // target is not busy, so do a tentative block allocation - pendingBlock.source = this; - pendingBlock.target = task.target; - if ( pendingBlock.chooseBlockAndProxy() ) { - long blockSize = pendingBlock.block.getNumBytes(); - incScheduledSize(-blockSize); - task.size -= blockSize; - if (task.size == 0) { - i.remove(); - } - return pendingBlock; - } else { - // cancel the tentative move - target.removePendingBlock(pendingBlock); - } - } - } - return null; - } - - /* iterate all source's blocks to remove moved ones */ - private void filterMovedBlocks() { - for (Iterator<BalancerBlock> blocks=getBlockIterator(); - blocks.hasNext();) { - if (movedBlocks.contains(blocks.next().getBlock())) { - blocks.remove(); - } - } - } - - private static final int SOURCE_BLOCK_LIST_MIN_SIZE=5; - /* Return if should fetch more blocks from namenode */ - private boolean shouldFetchMoreBlocks() { - return srcBlockList.size()<SOURCE_BLOCK_LIST_MIN_SIZE && - blocksToReceive>0; - } - - /* This method iteratively does the following: - * it first selects a block to move, - * then sends a request to the proxy source to start the block move - * when the source's block list falls below a threshold, it asks - * the namenode for more blocks. - * It terminates when it has dispatch enough block move tasks or - * it has received enough blocks from the namenode, or - * the elapsed time of the iteration has exceeded the max time limit. - */ - private static final long MAX_ITERATION_TIME = 20*60*1000L; //20 mins - private void dispatchBlocks() { - long startTime = Time.now(); - long scheduledSize = getScheduledSize(); - this.blocksToReceive = 2*scheduledSize; - boolean isTimeUp = false; - int noPendingBlockIteration = 0; - while(!isTimeUp && getScheduledSize()>0 && - (!srcBlockList.isEmpty() || blocksToReceive>0)) { - PendingBlockMove pendingBlock = chooseNextBlockToMove(); - if (pendingBlock != null) { - // move the block - pendingBlock.scheduleBlockMove(); - continue; - } - - /* Since we can not schedule any block to move, - * filter any moved blocks from the source block list and - * check if we should fetch more blocks from the namenode - */ - filterMovedBlocks(); // filter already moved blocks - if (shouldFetchMoreBlocks()) { - // fetch new blocks - try { - blocksToReceive -= getBlockList(); - continue; - } catch (IOException e) { - LOG.warn("Exception while getting block list", e); - return; - } - } else { - // source node cannot find a pendingBlockToMove, iteration +1 - noPendingBlockIteration++; - // in case no blocks can be moved for source node's task, - // jump out of while-loop after 5 iterations. - if (noPendingBlockIteration >= MAX_NO_PENDING_BLOCK_ITERATIONS) { - resetScheduledSize(); - } - } - - // check if time is up or not - if (Time.now()-startTime > MAX_ITERATION_TIME) { - isTimeUp = true; - continue; - } - - /* Now we can not schedule any block to move and there are - * no new blocks added to the source block list, so we wait. - */ - try { - synchronized(Balancer.this) { - Balancer.this.wait(1000); // wait for targets/sources to be idle - } - } catch (InterruptedException ignored) { - } - } - } - } + private final Collection<StorageGroup> belowAvgUtilized + = new LinkedList<StorageGroup>(); + private final Collection<StorageGroup> underUtilized + = new LinkedList<StorageGroup>(); /* Check that this Balancer is compatible with the Block Placement Policy * used by the Namenode. @@ -887,38 +210,26 @@ public class Balancer { * when connection fails. */ Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { - this.threshold = p.threshold; - this.policy = p.policy; - this.nodesToBeExcluded = p.nodesToBeExcluded; - this.nodesToBeIncluded = p.nodesToBeIncluded; - this.nnc = theblockpool; - this.keyManager = nnc.getKeyManager(); - final long movedWinWidth = conf.getLong( - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); - movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth); - - cluster = NetworkTopology.getInstance(conf); - - this.moverExecutor = Executors.newFixedThreadPool( - conf.getInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT)); - this.dispatcherExecutor = Executors.newFixedThreadPool( - conf.getInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, - DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT)); - this.maxConcurrentMovesPerNode = - conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + final int moverThreads = conf.getInt( + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_DEFAULT); + final int dispatcherThreads = conf.getInt( + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, + DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT); + final int maxConcurrentMovesPerNode = conf.getInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); - this.saslClient = new SaslDataTransferClient( - DataTransferSaslUtil.getSaslPropertiesResolver(conf), - TrustedChannelResolver.getInstance(conf), - conf.getBoolean( - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, - IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); + + this.dispatcher = new Dispatcher(theblockpool, p.nodesToBeIncluded, + p.nodesToBeExcluded, movedWinWidth, moverThreads, dispatcherThreads, + maxConcurrentMovesPerNode, conf); + this.threshold = p.threshold; + this.policy = p.policy; } - private static long getCapacity(DatanodeStorageReport report, StorageType t) { long capacity = 0L; for(StorageReport r : report.getStorageReports()) { @@ -939,26 +250,6 @@ public class Balancer { return remaining; } - private boolean shouldIgnore(DatanodeInfo dn) { - //ignore decommissioned nodes - final boolean decommissioned = dn.isDecommissioned(); - //ignore decommissioning nodes - final boolean decommissioning = dn.isDecommissionInProgress(); - // ignore nodes in exclude list - final boolean excluded = Util.shouldBeExcluded(nodesToBeExcluded, dn); - // ignore nodes not in the include list (if include list is not empty) - final boolean notIncluded = !Util.shouldBeIncluded(nodesToBeIncluded, dn); - - if (decommissioned || decommissioning || excluded || notIncluded) { - if (LOG.isTraceEnabled()) { - LOG.trace("Excluding datanode " + dn + ": " + decommissioned + ", " - + decommissioning + ", " + excluded + ", " + notIncluded); - } - return true; - } - return false; - } - /** * Given a datanode storage set, build a network topology and decide * over-utilized storages, above average utilized storages, @@ -966,16 +257,11 @@ public class Balancer { * The input datanode storage set is shuffled in order to randomize * to the storage matching later on. * - * @return the total number of bytes that are - * needed to move to make the cluster balanced. - * @param reports a set of datanode storage reports + * @return the number of bytes needed to move in order to balance the cluster. */ - private long init(DatanodeStorageReport[] reports) { + private long init(List<DatanodeStorageReport> reports) { // compute average utilization for (DatanodeStorageReport r : reports) { - if (shouldIgnore(r.getDatanodeInfo())) { - continue; - } policy.accumulateSpaces(r); } policy.initAvgUtilization(); @@ -983,15 +269,8 @@ public class Balancer { // create network topology and classify utilization collections: // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; - for(DatanodeStorageReport r : DFSUtil.shuffle(reports)) { - final DatanodeInfo datanode = r.getDatanodeInfo(); - if (shouldIgnore(datanode)) { - continue; // ignore decommissioning or decommissioned nodes - } - cluster.add(datanode); - - final BalancerDatanode dn = new BalancerDatanode(r, underLoadedBytes, - maxConcurrentMovesPerNode); + for(DatanodeStorageReport r : reports) { + final DDatanode dn = dispatcher.newDatanode(r); for(StorageType t : StorageType.asList()) { final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type @@ -1004,9 +283,9 @@ public class Balancer { final long maxSize2Move = computeMaxSize2Move(capacity, getRemaining(r, t), utilizationDiff, threshold); - final BalancerDatanode.StorageGroup g; + final StorageGroup g; if (utilizationDiff > 0) { - final Source s = dn.addSource(t, utilization, maxSize2Move, this); + final Source s = dn.addSource(t, maxSize2Move, dispatcher); if (thresholdDiff <= 0) { // within threshold aboveAvgUtilized.add(s); } else { @@ -1015,7 +294,7 @@ public class Balancer { } g = s; } else { - g = dn.addStorageGroup(t, utilization, maxSize2Move); + g = dn.addStorageGroup(t, maxSize2Move); if (thresholdDiff <= 0) { // within threshold belowAvgUtilized.add(g); } else { @@ -1023,14 +302,15 @@ public class Balancer { underUtilized.add(g); } } - storageGroupMap.put(g); + dispatcher.getStorageGroupMap().put(g); } } logUtilizationCollections(); - Preconditions.checkState(storageGroupMap.size() == overUtilized.size() - + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(), + Preconditions.checkState(dispatcher.getStorageGroupMap().size() + == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size() + + belowAvgUtilized.size(), "Mismatched number of storage groups"); // return number of bytes to be moved in order to make the cluster balanced @@ -1063,7 +343,7 @@ public class Balancer { logUtilizationCollection("underutilized", underUtilized); } - private static <T extends BalancerDatanode.StorageGroup> + private static <T extends StorageGroup> void logUtilizationCollection(String name, Collection<T> items) { LOG.info(items.size() + " " + name + ": " + items); } @@ -1077,7 +357,7 @@ public class Balancer { */ private long chooseStorageGroups() { // First, match nodes on the same node group if cluster is node group aware - if (cluster.isNodeGroupAware()) { + if (dispatcher.getCluster().isNodeGroupAware()) { chooseStorageGroups(Matcher.SAME_NODE_GROUP); } @@ -1086,15 +366,7 @@ public class Balancer { // At last, match all remaining nodes chooseStorageGroups(Matcher.ANY_OTHER); - Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(), - "Mismatched number of datanodes (" + storageGroupMap.size() + " < " - + sources.size() + " sources, " + targets.size() + " targets)"); - - long bytesToMove = 0L; - for (Source src : sources) { - bytesToMove += src.getScheduledSize(); - } - return bytesToMove; + return dispatcher.bytesToMove(); } /** Decide all <source, target> pairs according to the matcher. */ @@ -1124,8 +396,7 @@ public class Balancer { * datanodes or the candidates are source nodes with (utilization > Avg), and * the others are target nodes with (utilization < Avg). */ - private <G extends BalancerDatanode.StorageGroup, - C extends BalancerDatanode.StorageGroup> + private <G extends StorageGroup, C extends StorageGroup> void chooseStorageGroups(Collection<G> groups, Collection<C> candidates, Matcher matcher) { for(final Iterator<G> i = groups.iterator(); i.hasNext();) { @@ -1141,9 +412,8 @@ public class Balancer { * For the given datanode, choose a candidate and then schedule it. * @return true if a candidate is chosen; false if no candidates is chosen. */ - private <C extends BalancerDatanode.StorageGroup> - boolean choose4One(BalancerDatanode.StorageGroup g, - Collection<C> candidates, Matcher matcher) { + private <C extends StorageGroup> boolean choose4One(StorageGroup g, + Collection<C> candidates, Matcher matcher) { final Iterator<C> i = candidates.iterator(); final C chosen = chooseCandidate(g, i, matcher); @@ -1161,28 +431,26 @@ public class Balancer { return true; } - private void matchSourceWithTargetToMove(Source source, - BalancerDatanode.StorageGroup target) { + private void matchSourceWithTargetToMove(Source source, StorageGroup target) { long size = Math.min(source.availableSizeToMove(), target.availableSizeToMove()); final Task task = new Task(target, size); source.addTask(task); - target.incScheduledSize(task.size); - sources.add(source); - targets.add(target); + target.incScheduledSize(task.getSize()); + dispatcher.add(source, target); LOG.info("Decided to move "+StringUtils.byteDesc(size)+" bytes from " + source.getDisplayName() + " to " + target.getDisplayName()); } /** Choose a candidate for the given datanode. */ - private <G extends BalancerDatanode.StorageGroup, - C extends BalancerDatanode.StorageGroup> + private <G extends StorageGroup, C extends StorageGroup> C chooseCandidate(G g, Iterator<C> candidates, Matcher matcher) { if (g.hasSpaceForScheduling()) { for(; candidates.hasNext(); ) { final C c = candidates.next(); if (!c.hasSpaceForScheduling()) { candidates.remove(); - } else if (matcher.match(cluster, g.getDatanode(), c.getDatanode())) { + } else if (matcher.match(dispatcher.getCluster(), + g.getDatanodeInfo(), c.getDatanodeInfo())) { return c; } } @@ -1190,203 +458,25 @@ public class Balancer { return null; } - private final AtomicLong bytesMoved = new AtomicLong(); - - /* Start a thread to dispatch block moves for each source. - * The thread selects blocks to move & sends request to proxy source to - * initiate block move. The process is flow controlled. Block selection is - * blocked if there are too many un-confirmed block moves. - * Return the total number of bytes successfully moved in this iteration. - */ - private long dispatchBlockMoves() throws InterruptedException { - long bytesLastMoved = bytesMoved.get(); - Future<?>[] futures = new Future<?>[sources.size()]; - int i=0; - for (Source source : sources) { - futures[i++] = dispatcherExecutor.submit(source.new BlockMoveDispatcher()); - } - - // wait for all dispatcher threads to finish - for (Future<?> future : futures) { - try { - future.get(); - } catch (ExecutionException e) { - LOG.warn("Dispatcher thread failed", e.getCause()); - } - } - - // wait for all block moving to be done - waitForMoveCompletion(); - - return bytesMoved.get()-bytesLastMoved; - } - - // The sleeping period before checking if block move is completed again - static private long blockMoveWaitTime = 30000L; - - /** set the sleeping period for block move completion check */ - static void setBlockMoveWaitTime(long time) { - blockMoveWaitTime = time; - } - - /* wait for all block move confirmations - * by checking each target's pendingMove queue - */ - private void waitForMoveCompletion() { - boolean shouldWait; - do { - shouldWait = false; - for (BalancerDatanode.StorageGroup target : targets) { - if (!target.getBalancerDatanode().isPendingQEmpty()) { - shouldWait = true; - break; - } - } - if (shouldWait) { - try { - Thread.sleep(blockMoveWaitTime); - } catch (InterruptedException ignored) { - } - } - } while (shouldWait); - } - - /* Decide if it is OK to move the given block from source to target - * A block is a good candidate if - * 1. the block is not in the process of being moved/has not been moved; - * 2. the block does not have a replica on the target; - * 3. doing the move does not reduce the number of racks that the block has - */ - private boolean isGoodBlockCandidate(Source source, - BalancerDatanode.StorageGroup target, BalancerBlock block) { - if (source.storageType != target.storageType) { - return false; - } - // check if the block is moved or not - if (movedBlocks.contains(block.getBlock())) { - return false; - } - if (block.isLocatedOn(target)) { - return false; - } - if (cluster.isNodeGroupAware() && - isOnSameNodeGroupWithReplicas(target, block, source)) { - return false; - } - - boolean goodBlock = false; - if (cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) { - // good if source and target are on the same rack - goodBlock = true; - } else { - boolean notOnSameRack = true; - synchronized (block) { - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { - notOnSameRack = false; - break; - } - } - } - if (notOnSameRack) { - // good if target is target is not on the same rack as any replica - goodBlock = true; - } else { - // good if source is on the same rack as on of the replicas - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (loc != source && - cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) { - goodBlock = true; - break; - } - } - } - } - return goodBlock; - } - - /** - * Check if there are any replica (other than source) on the same node group - * with target. If true, then target is not a good candidate for placing - * specific block replica as we don't want 2 replicas under the same nodegroup - * after balance. - * @param target targetDataNode - * @param block dataBlock - * @param source sourceDataNode - * @return true if there are any replica (other than source) on the same node - * group with target - */ - private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target, - BalancerBlock block, Source source) { - final DatanodeInfo targetDn = target.getDatanode(); - for (BalancerDatanode.StorageGroup loc : block.getLocations()) { - if (loc != source && - cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) { - return true; - } - } - return false; - } - /* reset all fields in a balancer preparing for the next iteration */ private void resetData(Configuration conf) { - this.cluster = NetworkTopology.getInstance(conf); this.overUtilized.clear(); this.aboveAvgUtilized.clear(); this.belowAvgUtilized.clear(); this.underUtilized.clear(); - this.storageGroupMap.clear(); - this.sources.clear(); - this.targets.clear(); this.policy.reset(); - cleanGlobalBlockList(); - this.movedBlocks.cleanup(); + dispatcher.reset(conf);; } - /* Remove all blocks from the global block list except for the ones in the - * moved list. - */ - private void cleanGlobalBlockList() { - for (Iterator<Block> globalBlockListIterator=globalBlockList.keySet().iterator(); - globalBlockListIterator.hasNext();) { - Block block = globalBlockListIterator.next(); - if(!movedBlocks.contains(block)) { - globalBlockListIterator.remove(); - } - } - } - - // Exit status - enum ReturnStatus { - // These int values will map directly to the balancer process's exit code. - SUCCESS(0), - IN_PROGRESS(1), - ALREADY_RUNNING(-1), - NO_MOVE_BLOCK(-2), - NO_MOVE_PROGRESS(-3), - IO_EXCEPTION(-4), - ILLEGAL_ARGS(-5), - INTERRUPTED(-6); - - final int code; - - ReturnStatus(int code) { - this.code = code; - } - } - /** Run an iteration for all datanodes. */ - private ReturnStatus run(int iteration, Formatter formatter, + private ExitStatus run(int iteration, Formatter formatter, Configuration conf) { try { - /* get all live datanodes of a cluster and their disk usage - * decide the number of bytes need to be moved - */ - final long bytesLeftToMove = init( - nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE)); + final List<DatanodeStorageReport> reports = dispatcher.init(); + final long bytesLeftToMove = init(reports); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); - return ReturnStatus.SUCCESS; + return ExitStatus.SUCCESS; } else { LOG.info( "Need to move "+ StringUtils.byteDesc(bytesLeftToMove) + " to make the cluster balanced." ); @@ -1400,7 +490,7 @@ public class Balancer { final long bytesToMove = chooseStorageGroups(); if (bytesToMove == 0) { System.out.println("No block can be moved. Exiting..."); - return ReturnStatus.NO_MOVE_BLOCK; + return ExitStatus.NO_MOVE_BLOCK; } else { LOG.info( "Will move " + StringUtils.byteDesc(bytesToMove) + " in this iteration"); @@ -1409,7 +499,7 @@ public class Balancer { formatter.format("%-24s %10d %19s %18s %17s%n", DateFormat.getDateTimeInstance().format(new Date()), iteration, - StringUtils.byteDesc(bytesMoved.get()), + StringUtils.byteDesc(dispatcher.getBytesMoved()), StringUtils.byteDesc(bytesLeftToMove), StringUtils.byteDesc(bytesToMove) ); @@ -1420,24 +510,22 @@ public class Balancer { * available to move. * Exit no byte has been moved for 5 consecutive iterations. */ - if (!this.nnc.shouldContinue(dispatchBlockMoves())) { - return ReturnStatus.NO_MOVE_PROGRESS; + if (!dispatcher.dispatchAndCheckContinue()) { + return ExitStatus.NO_MOVE_PROGRESS; } - return ReturnStatus.IN_PROGRESS; + return ExitStatus.IN_PROGRESS; } catch (IllegalArgumentException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.ILLEGAL_ARGS; + return ExitStatus.ILLEGAL_ARGUMENTS; } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.IO_EXCEPTION; + return ExitStatus.IO_EXCEPTION; } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.INTERRUPTED; + return ExitStatus.INTERRUPTED; } finally { - // shutdown thread pools - dispatcherExecutor.shutdownNow(); - moverExecutor.shutdownNow(); + dispatcher.shutdownNow(); } } @@ -1474,14 +562,14 @@ public class Balancer { Collections.shuffle(connectors); for(NameNodeConnector nnc : connectors) { final Balancer b = new Balancer(nnc, p, conf); - final ReturnStatus r = b.run(iteration, formatter, conf); + final ExitStatus r = b.run(iteration, formatter, conf); // clean all lists b.resetData(conf); - if (r == ReturnStatus.IN_PROGRESS) { + if (r == ExitStatus.IN_PROGRESS) { done = false; - } else if (r != ReturnStatus.SUCCESS) { + } else if (r != ExitStatus.SUCCESS) { //must be an error statue, return. - return r.code; + return r.getExitCode(); } } @@ -1494,7 +582,7 @@ public class Balancer { nnc.close(); } } - return ReturnStatus.SUCCESS.code; + return ExitStatus.SUCCESS.getExitCode(); } /* Given elaspedTime in ms, return a printable string */ @@ -1546,76 +634,6 @@ public class Balancer { } } - static class Util { - - /** - * @param datanode - * @return returns true if data node is part of the excludedNodes. - */ - static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) { - return isIn(excludedNodes, datanode); - } - - /** - * @param datanode - * @return returns true if includedNodes is empty or data node is part of the includedNodes. - */ - static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) { - return (includedNodes.isEmpty() || - isIn(includedNodes, datanode)); - } - /** - * Match is checked using host name , ip address with and without port number. - * @param datanodeSet - * @param datanode - * @return true if the datanode's transfer address matches the set of nodes. - */ - private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) { - return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) || - isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) || - isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort()); - } - - /** - * returns true if nodes contains host or host:port - * @param nodes - * @param host - * @param port - * @return - */ - private static boolean isIn(Set<String> nodes, String host, int port) { - if (host == null) { - return false; - } - return (nodes.contains(host) || nodes.contains(host +":"+ port)); - } - - /** - * parse a comma separated string to obtain set of host names - * @param string - * @return - */ - static Set<String> parseHostList(String string) { - String[] addrs = StringUtils.getTrimmedStrings(string); - return new HashSet<String>(Arrays.asList(addrs)); - } - - /** - * read set of host names from a file - * @param fileName - * @return - */ - static Set<String> getHostListFromFile(String fileName) { - Set<String> nodes = new HashSet <String> (); - try { - HostsFileReader.readFileToSet("nodes", fileName, nodes); - return StringUtils.getTrimmedStrings(nodes); - } catch (IOException e) { - throw new IllegalArgumentException("Unable to open file: " + fileName); - } - } - } - static class Cli extends Configured implements Tool { /** * Parse arguments and then run Balancer. @@ -1635,10 +653,10 @@ public class Balancer { return Balancer.run(namenodes, parse(args), conf); } catch (IOException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.IO_EXCEPTION.code; + return ExitStatus.IO_EXCEPTION.getExitCode(); } catch (InterruptedException e) { System.out.println(e + ". Exiting ..."); - return ReturnStatus.INTERRUPTED.code; + return ExitStatus.INTERRUPTED.getExitCode(); } finally { System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); System.out.println("Balancing took " + time2Str(Time.now()-startTime)); @@ -1688,7 +706,7 @@ public class Balancer { checkArgument(++i < args.length, "File containing nodes to exclude is not specified: args = " + Arrays.toString(args)); - nodesTobeExcluded = Util.getHostListFromFile(args[i]); + nodesTobeExcluded = Util.getHostListFromFile(args[i], "exclude"); } else { nodesTobeExcluded = Util.parseHostList(args[i]); } @@ -1700,7 +718,7 @@ public class Balancer { checkArgument(++i < args.length, "File containing nodes to include is not specified: args = " + Arrays.toString(args)); - nodesTobeIncluded = Util.getHostListFromFile(args[i]); + nodesTobeIncluded = Util.getHostListFromFile(args[i], "include"); } else { nodesTobeIncluded = Util.parseHostList(args[i]); }
Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Tue Aug 12 17:02:07 2014 @@ -34,6 +34,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; @@ -90,14 +94,16 @@ public class NameNodeConnector implement return blockpoolID; } - /** @return the namenode proxy. */ - public NamenodeProtocol getNamenode() { - return namenode; + /** @return blocks with locations. */ + public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size) + throws IOException { + return namenode.getBlocks(datanode, size); } - /** @return the client proxy. */ - public ClientProtocol getClient() { - return client; + /** @return live datanode storage reports. */ + public DatanodeStorageReport[] getLiveDatanodeStorageReport() + throws IOException { + return client.getDatanodeStorageReport(DatanodeReportType.LIVE); } /** @return the key manager */ Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Aug 12 17:02:07 2014 @@ -135,7 +135,10 @@ public class DatanodeManager { /** The number of stale DataNodes */ private volatile int numStaleNodes; - + + /** The number of stale storages */ + private volatile int numStaleStorages; + /** * Whether or not this cluster has ever consisted of more than 1 rack, * according to the NetworkTopology. @@ -1142,6 +1145,22 @@ public class DatanodeManager { return this.numStaleNodes; } + /** + * Get the number of content stale storages. + */ + public int getNumStaleStorages() { + return numStaleStorages; + } + + /** + * Set the number of content stale storages. + * + * @param numStaleStorages The number of content stale storages. + */ + void setNumStaleStorages(int numStaleStorages) { + this.numStaleStorages = numStaleStorages; + } + /** Fetch live and dead datanodes. */ public void fetchDatanodes(final List<DatanodeDescriptor> live, final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) { Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Tue Aug 12 17:02:07 2014 @@ -256,6 +256,7 @@ class HeartbeatManager implements Datano DatanodeID dead = null; // check the number of stale nodes int numOfStaleNodes = 0; + int numOfStaleStorages = 0; synchronized(this) { for (DatanodeDescriptor d : datanodes) { if (dead == null && dm.isDatanodeDead(d)) { @@ -265,10 +266,17 @@ class HeartbeatManager implements Datano if (d.isStale(dm.getStaleInterval())) { numOfStaleNodes++; } + DatanodeStorageInfo[] storageInfos = d.getStorageInfos(); + for(DatanodeStorageInfo storageInfo : storageInfos) { + if (storageInfo.areBlockContentsStale()) { + numOfStaleStorages++; + } + } } // Set the number of stale nodes in the DatanodeManager dm.setNumStaleNodes(numOfStaleNodes); + dm.setNumStaleStorages(numOfStaleStorages); } allAlive = dead == null; Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Aug 12 17:02:07 2014 @@ -601,7 +601,7 @@ class BPOfferService { LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr + " with " + actor.state + " state"); actor.reRegister(); - return true; + return false; } writeLock(); try { Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Tue Aug 12 17:02:07 2014 @@ -222,7 +222,19 @@ class BPServiceActor implements Runnable // Second phase of the handshake with the NN. register(); } - + + // This is useful to make sure NN gets Heartbeat before Blockreport + // upon NN restart while DN keeps retrying Otherwise, + // 1. NN restarts. + // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister. + // 3. After reregistration completes, DN will send Blockreport first. + // 4. Given NN receives Blockreport after Heartbeat, it won't mark + // DatanodeStorageInfo#blockContentsStale to false until the next + // Blockreport. + void scheduleHeartbeat() { + lastHeartbeat = 0; + } + /** * This methods arranges for the data node to send the block report at * the next heartbeat. @@ -902,6 +914,7 @@ class BPServiceActor implements Runnable retrieveNamespaceInfo(); // and re-register register(); + scheduleHeartbeat(); } } Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Tue Aug 12 17:02:07 2014 @@ -36,8 +36,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.Properties; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -106,13 +108,22 @@ public class BlockPoolSliceStorage exten void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt) throws IOException { LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); + Set<String> existingStorageDirs = new HashSet<String>(); + for (int i = 0; i < getNumStorageDirs(); i++) { + existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); + } + // 1. For each BP data directory analyze the state and // check whether all is consistent before transitioning. - this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>( dataDirs.size()); for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next(); + if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + LOG.info("Storage directory " + dataDir + " has already been used."); + it.remove(); + continue; + } StorageDirectory sd = new StorageDirectory(dataDir, null, true); StorageState curState; try { Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Aug 12 17:02:07 2014 @@ -55,6 +55,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -172,43 +173,99 @@ public class DataStorage extends Storage } /** - * Analyze storage directories. - * Recover from previous transitions if required. - * Perform fs state transition if necessary depending on the namespace info. - * Read storage info. - * <br> - * This method should be synchronized between multiple DN threads. Only the - * first DN thread does DN level storage dir recoverTransitionRead. - * + * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}} + */ + private void writeAll(Collection<StorageDirectory> dirs) throws IOException { + this.layoutVersion = getServiceLayoutVersion(); + for (StorageDirectory dir : dirs) { + writeProperties(dir); + } + } + + /** + * Add a list of volumes to be managed by DataStorage. If the volume is empty, + * format it, otherwise recover it from previous transitions if required. + * + * @param datanode the reference to DataNode. * @param nsInfo namespace information * @param dataDirs array of data storage directories * @param startOpt startup option * @throws IOException */ - synchronized void recoverTransitionRead(DataNode datanode, + synchronized void addStorageLocations(DataNode datanode, NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException { - if (initialized) { - // DN storage has been initialized, no need to do anything - return; + // Similar to recoverTransitionRead, it first ensures the datanode level + // format is completed. + List<StorageLocation> tmpDataDirs = + new ArrayList<StorageLocation>(dataDirs); + addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true); + + Collection<File> bpDataDirs = new ArrayList<File>(); + String bpid = nsInfo.getBlockPoolID(); + for (StorageLocation dir : dataDirs) { + File dnRoot = dir.getFile(); + File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot, + STORAGE_DIR_CURRENT)); + bpDataDirs.add(bpRoot); } - LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION - + " and name-node layout version: " + nsInfo.getLayoutVersion()); - - // 1. For each data directory calculate its state and - // check whether all is consistent before transitioning. - // Format and recover. - this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); - ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size()); + // mkdir for the list of BlockPoolStorage + makeBlockPoolDataDir(bpDataDirs, null); + BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid); + if (bpStorage == null) { + bpStorage = new BlockPoolSliceStorage( + nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), + nsInfo.getClusterID()); + } + + bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); + addBlockPoolStorage(bpid, bpStorage); + } + + /** + * Add a list of volumes to be managed by this DataStorage. If the volume is + * empty, it formats the volume, otherwise it recovers it from previous + * transitions if required. + * + * If isInitialize is false, only the directories that have finished the + * doTransition() process will be added into DataStorage. + * + * @param datanode the reference to DataNode. + * @param nsInfo namespace information + * @param dataDirs array of data storage directories + * @param startOpt startup option + * @param isInitialize whether it is called when DataNode starts up. + * @throws IOException + */ + private synchronized void addStorageLocations(DataNode datanode, + NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, + StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs) + throws IOException { + Set<String> existingStorageDirs = new HashSet<String>(); + for (int i = 0; i < getNumStorageDirs(); i++) { + existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); + } + + // 1. For each data directory calculate its state and check whether all is + // consistent before transitioning. Format and recover. + ArrayList<StorageState> dataDirStates = + new ArrayList<StorageState>(dataDirs.size()); + List<StorageDirectory> addedStorageDirectories = + new ArrayList<StorageDirectory>(); for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next().getFile(); + if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + LOG.info("Storage directory " + dataDir + " has already been used."); + it.remove(); + continue; + } StorageDirectory sd = new StorageDirectory(dataDir); StorageState curState; try { curState = sd.analyzeStorage(startOpt, this); // sd is locked but not opened - switch(curState) { + switch (curState) { case NORMAL: break; case NON_EXISTENT: @@ -217,7 +274,8 @@ public class DataStorage extends Storage it.remove(); continue; case NOT_FORMATTED: // format - LOG.info("Storage directory " + dataDir + " is not formatted"); + LOG.info("Storage directory " + dataDir + " is not formatted for " + + nsInfo.getBlockPoolID()); LOG.info("Formatting ..."); format(sd, nsInfo, datanode.getDatanodeUuid()); break; @@ -231,33 +289,82 @@ public class DataStorage extends Storage //continue with other good dirs continue; } - // add to the storage list - addStorageDir(sd); + if (isInitialize) { + addStorageDir(sd); + } + addedStorageDirectories.add(sd); dataDirStates.add(curState); } - if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist + if (dataDirs.size() == 0 || dataDirStates.size() == 0) { + // none of the data dirs exist + if (ignoreExistingDirs) { + return; + } throw new IOException( "All specified directories are not accessible or do not exist."); + } // 2. Do transitions // Each storage directory is treated individually. - // During startup some of them can upgrade or rollback - // while others could be uptodate for the regular startup. - try { - for (int idx = 0; idx < getNumStorageDirs(); idx++) { - doTransition(datanode, getStorageDir(idx), nsInfo, startOpt); - createStorageID(getStorageDir(idx)); + // During startup some of them can upgrade or rollback + // while others could be up-to-date for the regular startup. + for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator(); + it.hasNext(); ) { + StorageDirectory sd = it.next(); + try { + doTransition(datanode, sd, nsInfo, startOpt); + createStorageID(sd); + } catch (IOException e) { + if (!isInitialize) { + sd.unlock(); + it.remove(); + continue; + } + unlockAll(); + throw e; } - } catch (IOException e) { - unlockAll(); - throw e; } - // 3. Update all storages. Some of them might have just been formatted. - this.writeAll(); + // 3. Update all successfully loaded storages. Some of them might have just + // been formatted. + this.writeAll(addedStorageDirectories); + + // 4. Make newly loaded storage directories visible for service. + if (!isInitialize) { + this.storageDirs.addAll(addedStorageDirectories); + } + } + + /** + * Analyze storage directories. + * Recover from previous transitions if required. + * Perform fs state transition if necessary depending on the namespace info. + * Read storage info. + * <br> + * This method should be synchronized between multiple DN threads. Only the + * first DN thread does DN level storage dir recoverTransitionRead. + * + * @param nsInfo namespace information + * @param dataDirs array of data storage directories + * @param startOpt startup option + * @throws IOException + */ + synchronized void recoverTransitionRead(DataNode datanode, + NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, + StartupOption startOpt) + throws IOException { + if (initialized) { + // DN storage has been initialized, no need to do anything + return; + } + LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION + + " and NameNode layout version: " + nsInfo.getLayoutVersion()); + + this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); + addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false); - // 4. mark DN storage is initialized + // mark DN storage is initialized this.initialized = true; } Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Tue Aug 12 17:02:07 2014 @@ -78,7 +78,7 @@ public class StorageLocation { * @return A StorageLocation object if successfully parsed, null otherwise. * Does not throw any exceptions. */ - static StorageLocation parse(String rawLocation) + public static StorageLocation parse(String rawLocation) throws IOException, SecurityException { Matcher matcher = regex.matcher(rawLocation); StorageType storageType = StorageType.DEFAULT; Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Aug 12 17:02:07 2014 @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileDescriptor; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -91,6 +93,10 @@ public interface FsDatasetSpi<V extends /** @return a list of volumes. */ public List<V> getVolumes(); + /** Add an array of StorageLocation to FsDataset. */ + public void addVolumes(Collection<StorageLocation> volumes) + throws IOException; + /** @return a storage with the given storage ID */ public DatanodeStorage getStorage(final String storageUuid); Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Tue Aug 12 17:02:07 2014 @@ -61,6 +61,7 @@ class FsDatasetAsyncDiskService { private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private final DataNode datanode; + private final ThreadGroup threadGroup; private Map<File, ThreadPoolExecutor> executors = new HashMap<File, ThreadPoolExecutor>(); @@ -70,42 +71,52 @@ class FsDatasetAsyncDiskService { * * The AsyncDiskServices uses one ThreadPool per volume to do the async * disk operations. - * - * @param volumes The roots of the data volumes. */ - FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) { + FsDatasetAsyncDiskService(DataNode datanode) { this.datanode = datanode; + this.threadGroup = new ThreadGroup(getClass().getSimpleName()); + } + + private void addExecutorForVolume(final File volume) { + ThreadFactory threadFactory = new ThreadFactory() { + int counter = 0; + + @Override + public Thread newThread(Runnable r) { + int thisIndex; + synchronized (this) { + thisIndex = counter++; + } + Thread t = new Thread(threadGroup, r); + t.setName("Async disk worker #" + thisIndex + + " for volume " + volume); + return t; + } + }; - final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName()); - // Create one ThreadPool per volume - for (int v = 0 ; v < volumes.length; v++) { - final File vol = volumes[v]; - ThreadFactory threadFactory = new ThreadFactory() { - int counter = 0; - - @Override - public Thread newThread(Runnable r) { - int thisIndex; - synchronized (this) { - thisIndex = counter++; - } - Thread t = new Thread(threadGroup, r); - t.setName("Async disk worker #" + thisIndex + - " for volume " + vol); - return t; - } - }; - - ThreadPoolExecutor executor = new ThreadPoolExecutor( - CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, - THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), threadFactory); - - // This can reduce the number of running threads - executor.allowCoreThreadTimeOut(true); - executors.put(vol, executor); + ThreadPoolExecutor executor = new ThreadPoolExecutor( + CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, + THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), threadFactory); + + // This can reduce the number of running threads + executor.allowCoreThreadTimeOut(true); + executors.put(volume, executor); + } + + /** + * Starts AsyncDiskService for a new volume + * @param volume the root of the new data volume. + */ + synchronized void addVolume(File volume) { + if (executors == null) { + throw new RuntimeException("AsyncDiskService is already shutdown"); } - + ThreadPoolExecutor executor = executors.get(volume); + if (executor != null) { + throw new RuntimeException("Volume " + volume + " is already existed."); + } + addExecutorForVolume(volume); } synchronized long countPendingDeletions() { Modified: hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1617532&r1=1617531&r2=1617532&view=diff ============================================================================== --- hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original) +++ hadoop/common/branches/fs-encryption/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Tue Aug 12 17:02:07 2014 @@ -202,6 +202,7 @@ class FsDatasetImpl implements FsDataset final Map<String, DatanodeStorage> storageMap; final FsDatasetAsyncDiskService asyncDiskService; final FsDatasetCache cacheManager; + private final Configuration conf; private final int validVolsRequired; final ReplicaMap volumeMap; @@ -216,6 +217,7 @@ class FsDatasetImpl implements FsDataset ) throws IOException { this.datanode = datanode; this.dataStorage = storage; + this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = @@ -242,38 +244,76 @@ class FsDatasetImpl implements FsDataset } storageMap = new HashMap<String, DatanodeStorage>(); - final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>( - storage.getNumStorageDirs()); - for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { - Storage.StorageDirectory sd = storage.getStorageDir(idx); - final File dir = sd.getCurrentDir(); - final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); - volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf, - storageType)); - LOG.info("Added volume - " + dir + ", StorageType: " + storageType); - storageMap.put(sd.getStorageUuid(), - new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); - } volumeMap = new ReplicaMap(this); - @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); - volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl); - volumes.initializeReplicaMaps(volumeMap); + volumes = new FsVolumeList(volsFailed, blockChooserImpl); + asyncDiskService = new FsDatasetAsyncDiskService(datanode); - File[] roots = new File[storage.getNumStorageDirs()]; for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { - roots[idx] = storage.getStorageDir(idx).getCurrentDir(); + addVolume(dataLocations, storage.getStorageDir(idx)); } - asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots); + cacheManager = new FsDatasetCache(this); registerMBean(datanode.getDatanodeUuid()); } + private void addVolume(Collection<StorageLocation> dataLocations, + Storage.StorageDirectory sd) throws IOException { + final File dir = sd.getCurrentDir(); + final StorageType storageType = + getStorageTypeFromLocations(dataLocations, sd.getRoot()); + + // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is + // nothing needed to be rolled back to make various data structures, e.g., + // storageMap and asyncDiskService, consistent. + FsVolumeImpl fsVolume = new FsVolumeImpl( + this, sd.getStorageUuid(), dir, this.conf, storageType); + fsVolume.getVolumeMap(volumeMap); + + volumes.addVolume(fsVolume); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(sd.getCurrentDir()); + + LOG.info("Added volume - " + dir + ", StorageType: " + storageType); + } + + /** + * Add an array of StorageLocation to FsDataset. + * + * @pre dataStorage must have these volumes. + * @param volumes + * @throws IOException + */ + @Override + public synchronized void addVolumes(Collection<StorageLocation> volumes) + throws IOException { + final Collection<StorageLocation> dataLocations = + DataNode.getStorageLocations(this.conf); + Map<String, Storage.StorageDirectory> allStorageDirs = + new HashMap<String, Storage.StorageDirectory>(); + for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { + Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); + allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd); + } + + for (StorageLocation vol : volumes) { + String key = vol.getFile().getAbsolutePath(); + if (!allStorageDirs.containsKey(key)) { + LOG.warn("Attempt to add an invalid volume: " + vol.getFile()); + } else { + addVolume(dataLocations, allStorageDirs.get(key)); + } + } + } + private StorageType getStorageTypeFromLocations( Collection<StorageLocation> dataLocations, File dir) { for (StorageLocation dataLocation : dataLocations) {