Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Aug 19 23:49:39 2014 @@ -261,13 +261,19 @@ public class BlockManager { this.namesystem = namesystem; datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); - invalidateBlocks = new InvalidateBlocks(datanodeManager); + + final long pendingPeriod = conf.getLong( + DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY, + DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L; + invalidateBlocks = new InvalidateBlocks( + datanodeManager.blockInvalidateLimit, pendingPeriod); // Compute the map capacity by allocating 2% of total memory blocksMap = new BlocksMap( LightWeightGSet.computeCapacity(2.0, "BlocksMap")); blockplacement = BlockPlacementPolicy.getInstance( - conf, stats, datanodeManager.getNetworkTopology()); + conf, stats, datanodeManager.getNetworkTopology(), + datanodeManager.getHost2DatanodeMap()); pendingReplications = new PendingReplicationBlocks(conf.getInt( DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); @@ -423,10 +429,8 @@ public class BlockManager { public void close() { try { - if (replicationThread != null) { - replicationThread.interrupt(); - replicationThread.join(3000); - } + replicationThread.interrupt(); + replicationThread.join(3000); } catch (InterruptedException ie) { } datanodeManager.close(); @@ -549,7 +553,6 @@ public class BlockManager { } /** - * @param block * @return true if the block has minimum replicas */ public boolean checkMinReplication(Block block) { @@ -699,7 +702,7 @@ public class BlockManager { // remove this block from the list of pending blocks to be deleted. for (DatanodeStorageInfo storage : targets) { - invalidateBlocks.remove(storage.getStorageID(), oldBlock); + invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock); } // Adjust safe-mode totals, since under-construction blocks don't @@ -722,9 +725,8 @@ public class BlockManager { final List<DatanodeStorageInfo> locations = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block)); for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { - final String storageID = storage.getStorageID(); // filter invalidate replicas - if(!invalidateBlocks.contains(storageID, block)) { + if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) { locations.add(storage); } } @@ -819,7 +821,7 @@ public class BlockManager { for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d); - if (isCorrupt || (!isCorrupt && !replicaCorrupt)) + if (isCorrupt || (!replicaCorrupt)) machines[j++] = storage; } } @@ -943,6 +945,16 @@ public class BlockManager { } /** + * Check if a block is replicated to at least the minimum replication. + */ + public boolean isSufficientlyReplicated(BlockInfo b) { + // Compare against the lesser of the minReplication and number of live DNs. + final int replication = + Math.min(minReplication, getDatanodeManager().getNumLiveDataNodes()); + return countNodes(b).liveReplicas() >= replication; + } + + /** * return a list of blocks & their locations on <code>datanode</code> whose * total size is <code>size</code> * @@ -1010,9 +1022,11 @@ public class BlockManager { while(it.hasNext()) { removeStoredBlock(it.next(), node); } + // Remove all pending DN messages referencing this DN. + pendingDNMessages.removeAllMessagesForDatanode(node); node.resetBlocks(); - invalidateBlocks.remove(node.getDatanodeUuid()); + invalidateBlocks.remove(node); // If the DN hasn't block-reported since the most recent // failover, then we may have been holding up on processing @@ -1035,6 +1049,9 @@ public class BlockManager { * datanode and log the operation */ void addToInvalidates(final Block block, final DatanodeInfo datanode) { + if (!namesystem.isPopulatingReplQueues()) { + return; + } invalidateBlocks.add(block, datanode, true); } @@ -1043,6 +1060,9 @@ public class BlockManager { * datanodes. */ private void addToInvalidates(Block b) { + if (!namesystem.isPopulatingReplQueues()) { + return; + } StringBuilder datanodes = new StringBuilder(); for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); @@ -1059,6 +1079,7 @@ public class BlockManager { * Mark the block belonging to datanode as corrupt * @param blk Block to be marked as corrupt * @param dn Datanode which holds the corrupt replica + * @param storageID if known, null otherwise. * @param reason a textual reason why the block should be marked corrupt, * for logging purposes */ @@ -1075,17 +1096,29 @@ public class BlockManager { + blk + " not found"); return; } - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason, - Reason.CORRUPTION_REPORTED), dn, storageID); - } - private void markBlockAsCorrupt(BlockToMarkCorrupt b, - DatanodeInfo dn, String storageID) throws IOException { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { - throw new IOException("Cannot mark " + b - + " as corrupt because datanode " + dn + " does not exist"); + throw new IOException("Cannot mark " + blk + + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + + ") does not exist"); } + + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, + blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), + storageID == null ? null : node.getStorageInfo(storageID), + node); + } + + /** + * + * @param b + * @param storageInfo storage that contains the block, if known. null otherwise. + * @throws IOException + */ + private void markBlockAsCorrupt(BlockToMarkCorrupt b, + DatanodeStorageInfo storageInfo, + DatanodeDescriptor node) throws IOException { BlockCollection bc = b.corrupted.getBlockCollection(); if (bc == null) { @@ -1096,12 +1129,32 @@ public class BlockManager { } // Add replica to the data-node if it is not already there - node.addBlock(storageID, b.stored); + if (storageInfo != null) { + storageInfo.addBlock(b.stored); + } // Add this replica to corruptReplicas Map corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, b.reasonCode); - if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) { + + NumberReplicas numberOfReplicas = countNodes(b.stored); + boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc + .getBlockReplication(); + boolean minReplicationSatisfied = + numberOfReplicas.liveReplicas() >= minReplication; + boolean hasMoreCorruptReplicas = minReplicationSatisfied && + (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > + bc.getBlockReplication(); + boolean corruptedDuringWrite = minReplicationSatisfied && + (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp()); + // case 1: have enough number of live replicas + // case 2: corrupted replicas + live replicas > Replication factor + // case 3: Block is marked corrupt due to failure while writing. In this + // case genstamp will be different than that of valid block. + // In all these cases we can delete the replica. + // In case of 3, rbw block will be deleted and valid block can be replicated + if (hasEnoughLiveReplicas || hasMoreCorruptReplicas + || corruptedDuringWrite) { // the block is over-replicated so invalidate the replicas immediately invalidateBlock(b, node); } else if (namesystem.isPopulatingReplQueues()) { @@ -1179,14 +1232,20 @@ public class BlockManager { * @return total number of block for deletion */ int computeInvalidateWork(int nodesToProcess) { - final List<String> nodes = invalidateBlocks.getStorageIDs(); + final List<DatanodeInfo> nodes = invalidateBlocks.getDatanodes(); Collections.shuffle(nodes); nodesToProcess = Math.min(nodes.size(), nodesToProcess); int blockCnt = 0; - for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) { - blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt)); + for (DatanodeInfo dnInfo : nodes) { + int blocks = invalidateWorkForOneNode(dnInfo); + if (blocks > 0) { + blockCnt += blocks; + if (--nodesToProcess == 0) { + break; + } + } } return blockCnt; } @@ -1411,7 +1470,7 @@ public class BlockManager { * @throws IOException * if the number of targets < minimum replication. * @see BlockPlacementPolicy#chooseTarget(String, int, Node, - * List, boolean, Set, long) + * List, boolean, Set, long, StorageType) */ public DatanodeStorageInfo[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, @@ -1648,7 +1707,7 @@ public class BlockManager { * @throws IOException */ public boolean processReport(final DatanodeID nodeID, - final DatanodeStorage storage, final String poolId, + final DatanodeStorage storage, final BlockListAsLongs newReport) throws IOException { namesystem.writeLock(); final long startTime = Time.now(); //after acquiring write lock @@ -1680,9 +1739,9 @@ public class BlockManager { if (storageInfo.numBlocks() == 0) { // The first block report can be processed a lot more efficiently than // ordinary block reports. This shortens restart times. - processFirstBlockReport(node, storage.getStorageID(), newReport); + processFirstBlockReport(storageInfo, newReport); } else { - processReport(node, storage, newReport); + processReport(storageInfo, newReport); } // Now that we have an up-to-date block report, we know that any @@ -1708,6 +1767,7 @@ public class BlockManager { } blockLog.info("BLOCK* processReport: from storage " + storage.getStorageID() + " node " + nodeID + ", blocks: " + newReport.getNumberOfBlocks() + + ", hasStaleStorages: " + node.hasStaleStorages() + ", processing time: " + (endTime - startTime) + " msecs"); return !node.hasStaleStorages(); } @@ -1743,9 +1803,8 @@ public class BlockManager { } } - private void processReport(final DatanodeDescriptor node, - final DatanodeStorage storage, - final BlockListAsLongs report) throws IOException { + private void processReport(final DatanodeStorageInfo storageInfo, + final BlockListAsLongs report) throws IOException { // Normal case: // Modify the (block-->datanode) map, according to the difference // between the old and new block report. @@ -1755,19 +1814,20 @@ public class BlockManager { Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); - reportDiff(node, storage, report, + reportDiff(storageInfo, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); - + + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node, storage.getStorageID()); + addStoredBlockUnderConstruction(b, storageInfo); } for (Block b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -1781,7 +1841,7 @@ public class BlockManager { addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node, storage.getStorageID()); + markBlockAsCorrupt(b, storageInfo, node); } } @@ -1792,16 +1852,16 @@ public class BlockManager { * a toRemove list (since there won't be any). It also silently discards * any invalid blocks, thereby deferring their processing until * the next block report. - * @param node - DatanodeDescriptor of the node that sent the report + * @param storageInfo - DatanodeStorageInfo that sent the report * @param report - the initial block report, to be processed * @throws IOException */ - private void processFirstBlockReport(final DatanodeDescriptor node, - final String storageID, + private void processFirstBlockReport( + final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; assert (namesystem.hasWriteLock()); - assert (node.getStorageInfo(storageID).numBlocks() == 0); + assert (storageInfo.numBlocks() == 0); BlockReportIterator itBR = report.getBlockReportIterator(); while(itBR.hasNext()) { @@ -1810,7 +1870,7 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk)) { - queueReportedBlock(node, storageID, iblk, reportedState, + queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); continue; } @@ -1822,15 +1882,16 @@ public class BlockManager { // If block is corrupt, mark it and continue to next block. BlockUCState ucState = storedBlock.getBlockUCState(); BlockToMarkCorrupt c = checkReplicaCorrupt( - iblk, reportedState, storedBlock, ucState, node); + iblk, reportedState, storedBlock, ucState, + storageInfo.getDatanodeDescriptor()); if (c != null) { if (shouldPostponeBlocksFromFuture) { // In the Standby, we may receive a block report for a file that we // just have an out-of-date gen-stamp or state for, for example. - queueReportedBlock(node, storageID, iblk, reportedState, + queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { - markBlockAsCorrupt(c, node, storageID); + markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor()); } continue; } @@ -1838,7 +1899,7 @@ public class BlockManager { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( - node.getStorageInfo(storageID), iblk, reportedState); + storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 @@ -1851,12 +1912,12 @@ public class BlockManager { } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, node, storageID); + addStoredBlockImmediate(storedBlock, storageInfo); } } } - private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, + private void reportDiff(DatanodeStorageInfo storageInfo, BlockListAsLongs newReport, Collection<BlockInfo> toAdd, // add to DatanodeDescriptor Collection<Block> toRemove, // remove from DatanodeDescriptor @@ -1864,8 +1925,6 @@ public class BlockManager { Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list Collection<StatefulBlockInfo> toUC) { // add to under-construction list - final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID()); - // place a delimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); @@ -1882,7 +1941,7 @@ public class BlockManager { while(itBR.hasNext()) { Block iblk = itBR.next(); ReplicaState iState = itBR.getCurrentReplicaState(); - BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(), + BlockInfo storedBlock = processReportedBlock(storageInfo, iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); // move block to the head of the list @@ -1919,7 +1978,7 @@ public class BlockManager { * BlockInfoUnderConstruction's list of replicas.</li> * </ol> * - * @param dn descriptor for the datanode that made the report + * @param storageInfo DatanodeStorageInfo that sent the report. * @param block reported block replica * @param reportedState reported replica state * @param toAdd add to DatanodeDescriptor @@ -1931,14 +1990,16 @@ public class BlockManager { * @return the up-to-date stored block, if it should be kept. * Otherwise, null. */ - private BlockInfo processReportedBlock(final DatanodeDescriptor dn, - final String storageID, + private BlockInfo processReportedBlock( + final DatanodeStorageInfo storageInfo, final Block block, final ReplicaState reportedState, final Collection<BlockInfo> toAdd, final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC) { + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); + if(LOG.isDebugEnabled()) { LOG.debug("Reported block " + block + " on " + dn + " size " + block.getNumBytes() @@ -1947,7 +2008,7 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { - queueReportedBlock(dn, storageID, block, reportedState, + queueReportedBlock(storageInfo, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); return null; } @@ -1968,7 +2029,7 @@ public class BlockManager { } // Ignore replicas already scheduled to be removed from the DN - if(invalidateBlocks.contains(dn.getDatanodeUuid(), block)) { + if(invalidateBlocks.contains(dn, block)) { /* * TODO: following assertion is incorrect, see HDFS-2668 assert * storedBlock.findDatanode(dn) < 0 : "Block " + block + @@ -1984,7 +2045,10 @@ public class BlockManager { // If the block is an out-of-date generation stamp or state, // but we're the standby, we shouldn't treat it as corrupt, // but instead just queue it for later processing. - queueReportedBlock(dn, storageID, storedBlock, reportedState, + // TODO: Pretty confident this should be s/storedBlock/block below, + // since we should be postponing the info of the reported block, not + // the stored block. See HDFS-6289 for more context. + queueReportedBlock(storageInfo, storedBlock, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { toCorrupt.add(c); @@ -2001,7 +2065,7 @@ public class BlockManager { // Add replica if appropriate. If the replica was previously corrupt // but now okay, it might need to be updated. if (reportedState == ReplicaState.FINALIZED - && (storedBlock.findDatanode(dn) < 0 + && (!storedBlock.findDatanode(dn) || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { toAdd.add(storedBlock); } @@ -2013,17 +2077,17 @@ public class BlockManager { * standby node. @see PendingDataNodeMessages. * @param reason a textual reason to report in the debug logs */ - private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, + private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, String reason) { assert shouldPostponeBlocksFromFuture; if (LOG.isDebugEnabled()) { LOG.debug("Queueing reported block " + block + " in state " + reportedState + - " from datanode " + dn + " for later processing " + - "because " + reason + "."); + " from datanode " + storageInfo.getDatanodeDescriptor() + + " for later processing because " + reason + "."); } - pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState); + pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState); } /** @@ -2046,7 +2110,7 @@ public class BlockManager { if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } - processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), + processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), rbi.getReportedState(), null); } } @@ -2103,6 +2167,16 @@ public class BlockManager { } else { return null; // not corrupt } + case UNDER_CONSTRUCTION: + if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) { + final long reportedGS = reported.getGenerationStamp(); + return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + + ucState + " and reported state " + reportedState + + ", But reported genstamp " + reportedGS + + " does not match genstamp in block map " + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); + } + return null; default: return null; } @@ -2166,19 +2240,20 @@ public class BlockManager { } void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, - DatanodeDescriptor node, String storageID) throws IOException { + DatanodeStorageInfo storageInfo) throws IOException { BlockInfoUnderConstruction block = ucBlock.storedBlock; - block.addReplicaIfNotPresent(node.getStorageInfo(storageID), - ucBlock.reportedBlock, ucBlock.reportedState); + block.addReplicaIfNotPresent( + storageInfo, ucBlock.reportedBlock, ucBlock.reportedState); - if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { - addStoredBlock(block, node, storageID, null, true); + if (ucBlock.reportedState == ReplicaState.FINALIZED && + !block.findDatanode(storageInfo.getDatanodeDescriptor())) { + addStoredBlock(block, storageInfo, null, true); } } /** * Faster version of - * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)} + * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)} * , intended for use with initial block report at startup. If not in startup * safe mode, will call standard addStoredBlock(). Assumes this method is * called "immediately" so there is no need to refresh the storedBlock from @@ -2189,17 +2264,17 @@ public class BlockManager { * @throws IOException */ private void addStoredBlockImmediate(BlockInfo storedBlock, - DatanodeDescriptor node, String storageID) + DatanodeStorageInfo storageInfo) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, node, storageID, null, false); + addStoredBlock(storedBlock, storageInfo, null, false); return; } // just add it - node.addBlock(storageID, storedBlock); + storageInfo.addBlock(storedBlock); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -2221,13 +2296,13 @@ public class BlockManager { * @return the block that is stored in blockMap. */ private Block addStoredBlock(final BlockInfo block, - DatanodeDescriptor node, - String storageID, + DatanodeStorageInfo storageInfo, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (block instanceof BlockInfoUnderConstruction) { //refresh our copy in case the block got completed in another thread storedBlock = blocksMap.getStoredBlock(block); @@ -2243,12 +2318,11 @@ public class BlockManager { // it will happen in next block report otherwise. return block; } - assert storedBlock != null : "Block must be stored by now"; BlockCollection bc = storedBlock.getBlockCollection(); assert bc != null : "Block must belong to a file"; // add block to the datanode - boolean added = node.addBlock(storageID, storedBlock); + boolean added = storageInfo.addBlock(storedBlock); int curReplicaDelta; if (added) { @@ -2587,7 +2661,7 @@ public class BlockManager { if (addedNode == delNodeHint) { delNodeHint = null; } - Collection<DatanodeDescriptor> nonExcess = new ArrayList<DatanodeDescriptor>(); + Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>(); Collection<DatanodeDescriptor> corruptNodes = corruptReplicas .getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) { @@ -2607,7 +2681,7 @@ public class BlockManager { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { // exclude corrupt replicas if (corruptNodes == null || !corruptNodes.contains(cur)) { - nonExcess.add(cur); + nonExcess.add(storage); } } } @@ -2631,7 +2705,7 @@ public class BlockManager { * If no such a node is available, * then pick a node with least free space */ - private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess, + private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, Block b, short replication, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint, @@ -2639,28 +2713,33 @@ public class BlockManager { assert namesystem.hasWriteLock(); // first form a rack to datanodes map and BlockCollection bc = getBlockCollection(b); - final Map<String, List<DatanodeDescriptor>> rackMap - = new HashMap<String, List<DatanodeDescriptor>>(); - final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>(); - final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>(); + + final Map<String, List<DatanodeStorageInfo>> rackMap + = new HashMap<String, List<DatanodeStorageInfo>>(); + final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>(); + final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>(); // split nodes into two sets // moreThanOne contains nodes on rack with more than one replica // exactlyOne contains the remaining nodes - replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, - exactlyOne); + replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne); // pick one node to delete that favors the delete hint // otherwise pick one with least space from priSet if it is not empty // otherwise one node with least space from remains boolean firstOne = true; + final DatanodeStorageInfo delNodeHintStorage + = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint); + final DatanodeStorageInfo addedNodeStorage + = DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode); while (nonExcess.size() - replication > 0) { // check if we can delete delNodeHint - final DatanodeInfo cur; - if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) - && (moreThanOne.contains(delNodeHint) - || (addedNode != null && !moreThanOne.contains(addedNode))) ) { - cur = delNodeHint; + final DatanodeStorageInfo cur; + if (firstOne && delNodeHintStorage != null + && (moreThanOne.contains(delNodeHintStorage) + || (addedNodeStorage != null + && !moreThanOne.contains(addedNodeStorage)))) { + cur = delNodeHintStorage; } else { // regular excessive replica removal cur = replicator.chooseReplicaToDelete(bc, b, replication, moreThanOne, exactlyOne); @@ -2672,7 +2751,7 @@ public class BlockManager { exactlyOne, cur); nonExcess.remove(cur); - addToExcessReplicate(cur, b); + addToExcessReplicate(cur.getDatanodeDescriptor(), b); // // The 'excessblocks' tracks blocks until we get confirmation @@ -2683,7 +2762,7 @@ public class BlockManager { // should be deleted. Items are removed from the invalidate list // upon giving instructions to the namenode. // - addToInvalidates(b, cur); + addToInvalidates(b, cur.getDatanodeDescriptor()); blockLog.info("BLOCK* chooseExcessReplicates: " +"("+cur+", "+b+") is added to invalidated blocks set"); } @@ -2772,12 +2851,15 @@ public class BlockManager { } else { final String[] datanodeUuids = new String[locations.size()]; final String[] storageIDs = new String[datanodeUuids.length]; + final StorageType[] storageTypes = new StorageType[datanodeUuids.length]; for(int i = 0; i < locations.size(); i++) { final DatanodeStorageInfo s = locations.get(i); datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid(); storageIDs[i] = s.getStorageID(); + storageTypes[i] = s.getStorageType(); } - results.add(new BlockWithLocations(block, datanodeUuids, storageIDs)); + results.add(new BlockWithLocations(block, datanodeUuids, storageIDs, + storageTypes)); return block.getNumBytes(); } } @@ -2786,8 +2868,9 @@ public class BlockManager { * The given node is reporting that it received a certain block. */ @VisibleForTesting - void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) + void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint) throws IOException { + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. @@ -2807,12 +2890,12 @@ public class BlockManager { // Modify the blocks->datanode map and node's map. // pendingReplications.decrement(block, node); - processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, + processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } - private void processAndHandleReportedBlock(DatanodeDescriptor node, - String storageID, Block block, + private void processAndHandleReportedBlock( + DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block @@ -2820,7 +2903,9 @@ public class BlockManager { Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); - processReportedBlock(node, storageID, block, reportedState, + final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); + + processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it @@ -2828,11 +2913,11 @@ public class BlockManager { : "The block should be only in one of the lists."; for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node, storageID); + addStoredBlockUnderConstruction(b, storageInfo); } long numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2846,7 +2931,7 @@ public class BlockManager { addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node, storageID); + markBlockAsCorrupt(b, storageInfo, node); } } @@ -2873,13 +2958,15 @@ public class BlockManager { "Got incremental block report from unregistered or dead node"); } - if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) { + DatanodeStorageInfo storageInfo = + node.getStorageInfo(srdb.getStorage().getStorageID()); + if (storageInfo == null) { // The DataNode is reporting an unknown storage. Usually the NN learns // about new storages from heartbeats but during NN restart we may // receive a block report or incremental report before the heartbeat. // We must handle this for protocol compatibility. This issue was // uncovered by HDFS-6094. - node.updateStorage(srdb.getStorage()); + storageInfo = node.updateStorage(srdb.getStorage()); } for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { @@ -2889,14 +2976,13 @@ public class BlockManager { deleted++; break; case RECEIVED_BLOCK: - addBlock(node, srdb.getStorage().getStorageID(), - rdbi.getBlock(), rdbi.getDelHints()); + addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints()); received++; break; case RECEIVING_BLOCK: receiving++; - processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(), - rdbi.getBlock(), ReplicaState.RBW, null); + processAndHandleReportedBlock(storageInfo, rdbi.getBlock(), + ReplicaState.RBW, null); break; default: String msg = @@ -2999,10 +3085,14 @@ public class BlockManager { /** * On stopping decommission, check if the node has excess replicas. - * If there are any excess replicas, call processOverReplicatedBlock() + * If there are any excess replicas, call processOverReplicatedBlock(). + * Process over replicated blocks only when active NN is out of safe mode. */ void processOverReplicatedBlocksOnReCommission( final DatanodeDescriptor srcNode) { + if (!namesystem.isPopulatingReplQueues()) { + return; + } final Iterator<? extends Block> it = srcNode.getBlockIterator(); int numOverReplicated = 0; while(it.hasNext()) { @@ -3068,11 +3158,13 @@ public class BlockManager { } } if (!neededReplications.contains(block) && - pendingReplications.getNumReplicas(block) == 0) { + pendingReplications.getNumReplicas(block) == 0 && + namesystem.isPopulatingReplQueues()) { // // These blocks have been reported from the datanode // after the startDecommission method has been executed. These // blocks were in flight when the decommissioning was started. + // Process these blocks only when active NN is out of safe mode. // neededReplications.add(block, curReplicas, @@ -3082,6 +3174,15 @@ public class BlockManager { } } } + + if (!status && !srcNode.isAlive) { + LOG.warn("srcNode " + srcNode + " is dead " + + "when decommission is in progress. Continue to mark " + + "it as decommission in progress. In that way, when it rejoins the " + + "cluster it can continue the decommission process."); + status = true; + } + srcNode.decommissioningStatus.set(underReplicatedBlocks, decommissionOnlyReplicas, underReplicatedInOpenFiles); @@ -3186,9 +3287,8 @@ public class BlockManager { * * @return number of blocks scheduled for removal during this iteration. */ - private int invalidateWorkForOneNode(String nodeId) { + private int invalidateWorkForOneNode(DatanodeInfo dn) { final List<Block> toInvalidate; - final DatanodeDescriptor dn; namesystem.writeLock(); try { @@ -3197,15 +3297,13 @@ public class BlockManager { LOG.debug("In safemode, not computing replication work"); return 0; } - // get blocks to invalidate for the nodeId - assert nodeId != null; - dn = datanodeManager.getDatanode(nodeId); - if (dn == null) { - invalidateBlocks.remove(nodeId); - return 0; - } - toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn); - if (toInvalidate == null) { + try { + toInvalidate = invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn)); + + if (toInvalidate == null) { + return 0; + } + } catch(UnregisteredNodeException une) { return 0; } } finally { @@ -3287,12 +3385,7 @@ public class BlockManager { } public int getCapacity() { - namesystem.readLock(); - try { - return blocksMap.getCapacity(); - } finally { - namesystem.readUnlock(); - } + return blocksMap.getCapacity(); } /** @@ -3344,8 +3437,11 @@ public class BlockManager { public void run() { while (namesystem.isRunning()) { try { - computeDatanodeWork(); - processPendingReplications(); + // Process replication work only when active NN is out of safe mode. + if (namesystem.isPopulatingReplQueues()) { + computeDatanodeWork(); + processPendingReplications(); + } Thread.sleep(replicationRecheckInterval); } catch (Throwable t) { if (!namesystem.isRunning()) { @@ -3373,7 +3469,6 @@ public class BlockManager { * heartbeat. * * @return number of blocks scheduled for replication or removal. - * @throws IOException */ int computeDatanodeWork() { // Blocks should not be replicated or removed if in safe mode.
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java Tue Aug 19 23:49:39 2014 @@ -61,7 +61,7 @@ public abstract class BlockPlacementPoli * @param srcPath the file to which this chooseTargets is being invoked. * @param numOfReplicas additional number of replicas wanted. * @param writer the writer's machine, null if not in the cluster. - * @param chosenNodes datanodes that have been chosen as targets. + * @param chosen datanodes that have been chosen as targets. * @param returnChosenNodes decide if the chosenNodes are returned. * @param excludedNodes datanodes that should not be considered as targets. * @param blocksize size of the data to be written. @@ -78,8 +78,8 @@ public abstract class BlockPlacementPoli StorageType storageType); /** - * Same as {@link #chooseTarget(String, int, Node, List, boolean, - * Set, long)} with added parameter {@code favoredDatanodes} + * Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)} + * with added parameter {@code favoredDatanodes} * @param favoredNodes datanodes that should be favored as targets. This * is only a hint and due to cluster state, namenode may not be * able to place the blocks on these datanodes. @@ -124,11 +124,12 @@ public abstract class BlockPlacementPoli listed in the previous parameter. * @return the replica that is the best candidate for deletion */ - abstract public DatanodeDescriptor chooseReplicaToDelete(BlockCollection srcBC, - Block block, - short replicationFactor, - Collection<DatanodeDescriptor> existingReplicas, - Collection<DatanodeDescriptor> moreExistingReplicas); + abstract public DatanodeStorageInfo chooseReplicaToDelete( + BlockCollection srcBC, + Block block, + short replicationFactor, + Collection<DatanodeStorageInfo> existingReplicas, + Collection<DatanodeStorageInfo> moreExistingReplicas); /** * Used to setup a BlockPlacementPolicy object. This should be defined by @@ -139,11 +140,13 @@ public abstract class BlockPlacementPoli * @param clusterMap cluster topology */ abstract protected void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap); /** * Get an instance of the configured Block Placement Policy based on the - * the configuration property {@link DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}. + * the configuration property + * {@link DFSConfigKeys#DFS_BLOCK_REPLICATOR_CLASSNAME_KEY}. * * @param conf the configuration to be used * @param stats an object that is used to retrieve the load on the cluster @@ -152,14 +155,15 @@ public abstract class BlockPlacementPoli */ public static BlockPlacementPolicy getInstance(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { final Class<? extends BlockPlacementPolicy> replicatorClass = conf.getClass( DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, BlockPlacementPolicy.class); final BlockPlacementPolicy replicator = ReflectionUtils.newInstance( replicatorClass, conf); - replicator.initialize(conf, stats, clusterMap); + replicator.initialize(conf, stats, clusterMap, host2datanodeMap); return replicator; } @@ -172,21 +176,23 @@ public abstract class BlockPlacementPoli * @param exactlyOne The List of replica nodes on rack with only one replica * @param cur current replica to remove */ - public void adjustSetsWithChosenReplica(final Map<String, - List<DatanodeDescriptor>> rackMap, - final List<DatanodeDescriptor> moreThanOne, - final List<DatanodeDescriptor> exactlyOne, final DatanodeInfo cur) { + public void adjustSetsWithChosenReplica( + final Map<String, List<DatanodeStorageInfo>> rackMap, + final List<DatanodeStorageInfo> moreThanOne, + final List<DatanodeStorageInfo> exactlyOne, + final DatanodeStorageInfo cur) { - String rack = getRack(cur); - final List<DatanodeDescriptor> datanodes = rackMap.get(rack); - datanodes.remove(cur); - if (datanodes.isEmpty()) { + final String rack = getRack(cur.getDatanodeDescriptor()); + final List<DatanodeStorageInfo> storages = rackMap.get(rack); + storages.remove(cur); + if (storages.isEmpty()) { rackMap.remove(rack); } if (moreThanOne.remove(cur)) { - if (datanodes.size() == 1) { - moreThanOne.remove(datanodes.get(0)); - exactlyOne.add(datanodes.get(0)); + if (storages.size() == 1) { + final DatanodeStorageInfo remaining = storages.get(0); + moreThanOne.remove(remaining); + exactlyOne.add(remaining); } } else { exactlyOne.remove(cur); @@ -195,7 +201,6 @@ public abstract class BlockPlacementPoli /** * Get rack string from a data node - * @param datanode * @return rack of data node */ protected String getRack(final DatanodeInfo datanode) { @@ -206,34 +211,34 @@ public abstract class BlockPlacementPoli * Split data nodes into two sets, one set includes nodes on rack with * more than one replica, the other set contains the remaining nodes. * - * @param dataNodes + * @param dataNodes datanodes to be split into two sets * @param rackMap a map from rack to datanodes * @param moreThanOne contains nodes on rack with more than one replica * @param exactlyOne remains contains the remaining nodes */ public void splitNodesWithRack( - Collection<DatanodeDescriptor> dataNodes, - final Map<String, List<DatanodeDescriptor>> rackMap, - final List<DatanodeDescriptor> moreThanOne, - final List<DatanodeDescriptor> exactlyOne) { - for(DatanodeDescriptor node : dataNodes) { - final String rackName = getRack(node); - List<DatanodeDescriptor> datanodeList = rackMap.get(rackName); - if (datanodeList == null) { - datanodeList = new ArrayList<DatanodeDescriptor>(); - rackMap.put(rackName, datanodeList); + final Iterable<DatanodeStorageInfo> storages, + final Map<String, List<DatanodeStorageInfo>> rackMap, + final List<DatanodeStorageInfo> moreThanOne, + final List<DatanodeStorageInfo> exactlyOne) { + for(DatanodeStorageInfo s: storages) { + final String rackName = getRack(s.getDatanodeDescriptor()); + List<DatanodeStorageInfo> storageList = rackMap.get(rackName); + if (storageList == null) { + storageList = new ArrayList<DatanodeStorageInfo>(); + rackMap.put(rackName, storageList); } - datanodeList.add(node); + storageList.add(s); } // split nodes into two sets - for(List<DatanodeDescriptor> datanodeList : rackMap.values()) { - if (datanodeList.size() == 1) { + for(List<DatanodeStorageInfo> storageList : rackMap.values()) { + if (storageList.size() == 1) { // exactlyOne contains nodes on rack with only one replica - exactlyOne.add(datanodeList.get(0)); + exactlyOne.add(storageList.get(0)); } else { // moreThanOne contains nodes on rack with more than one replica - moreThanOne.addAll(datanodeList); + moreThanOne.addAll(storageList); } } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Tue Aug 19 23:49:39 2014 @@ -70,6 +70,7 @@ public class BlockPlacementPolicyDefault protected boolean considerLoad; private boolean preferLocalNode = true; protected NetworkTopology clusterMap; + protected Host2NodesMap host2datanodeMap; private FSClusterStats stats; protected long heartbeatInterval; // interval for DataNode heartbeats private long staleInterval; // interval used to identify stale DataNodes @@ -80,8 +81,9 @@ public class BlockPlacementPolicyDefault protected int tolerateHeartbeatMultiplier; protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { + initialize(conf, stats, clusterMap, host2datanodeMap); } protected BlockPlacementPolicyDefault() { @@ -89,11 +91,13 @@ public class BlockPlacementPolicyDefault @Override public void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { this.considerLoad = conf.getBoolean( DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); this.stats = stats; this.clusterMap = clusterMap; + this.host2datanodeMap = host2datanodeMap; this.heartbeatInterval = conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; @@ -141,14 +145,14 @@ public class BlockPlacementPolicyDefault List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(); boolean avoidStaleNodes = stats != null && stats.isAvoidingStaleDataNodesForWrite(); - for (int i = 0; i < Math.min(favoredNodes.size(), numOfReplicas); i++) { + for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; i++) { DatanodeDescriptor favoredNode = favoredNodes.get(i); // Choose a single node which is local to favoredNode. // 'results' is updated within chooseLocalNode final DatanodeStorageInfo target = chooseLocalStorage(favoredNode, favoriteAndExcludedNodes, blocksize, getMaxNodesPerRack(results.size(), numOfReplicas)[1], - results, avoidStaleNodes, storageType); + results, avoidStaleNodes, storageType, false); if (target == null) { LOG.warn("Could not find a target for file " + src + " with favored node " + favoredNode); @@ -267,7 +271,7 @@ public class BlockPlacementPolicyDefault try { if (numOfResults == 0) { writer = chooseLocalStorage(writer, excludedNodes, blocksize, - maxNodesPerRack, results, avoidStaleNodes, storageType) + maxNodesPerRack, results, avoidStaleNodes, storageType, true) .getDatanodeDescriptor(); if (--numOfReplicas == 0) { return writer; @@ -341,12 +345,14 @@ public class BlockPlacementPolicyDefault int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) + StorageType storageType, + boolean fallbackToLocalRack) throws NotEnoughReplicasException { // if no local machine, randomly choose one node - if (localMachine == null) + if (localMachine == null) { return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); + } if (preferLocalNode && localMachine instanceof DatanodeDescriptor) { DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; // otherwise try local machine first @@ -359,7 +365,11 @@ public class BlockPlacementPolicyDefault } } } - } + } + + if (!fallbackToLocalRack) { + return null; + } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); @@ -632,15 +642,11 @@ public class BlockPlacementPolicyDefault // check the communication traffic of the target machine if (considerLoad) { - double avgLoad = 0; - if (stats != null) { - int size = stats.getNumDatanodesInService(); - if (size != 0) { - avgLoad = (double)stats.getTotalLoad()/size; - } - } - if (node.getXceiverCount() > (2.0 * avgLoad)) { - logNodeIsNotChosen(storage, "the node is too busy "); + final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); + final int nodeLoad = node.getXceiverCount(); + if (nodeLoad > maxLoad) { + logNodeIsNotChosen(storage, + "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") "); return false; } } @@ -723,31 +729,34 @@ public class BlockPlacementPolicyDefault } @Override - public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc, + public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc, Block block, short replicationFactor, - Collection<DatanodeDescriptor> first, - Collection<DatanodeDescriptor> second) { + Collection<DatanodeStorageInfo> first, + Collection<DatanodeStorageInfo> second) { long oldestHeartbeat = now() - heartbeatInterval * tolerateHeartbeatMultiplier; - DatanodeDescriptor oldestHeartbeatNode = null; + DatanodeStorageInfo oldestHeartbeatStorage = null; long minSpace = Long.MAX_VALUE; - DatanodeDescriptor minSpaceNode = null; + DatanodeStorageInfo minSpaceStorage = null; // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval - for(DatanodeDescriptor node : pickupReplicaSet(first, second)) { + for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) { + final DatanodeDescriptor node = storage.getDatanodeDescriptor(); long free = node.getRemaining(); long lastHeartbeat = node.getLastUpdate(); if(lastHeartbeat < oldestHeartbeat) { oldestHeartbeat = lastHeartbeat; - oldestHeartbeatNode = node; + oldestHeartbeatStorage = storage; } if (minSpace > free) { minSpace = free; - minSpaceNode = node; + minSpaceStorage = storage; } } - return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode; + + return oldestHeartbeatStorage != null? oldestHeartbeatStorage + : minSpaceStorage; } /** @@ -756,9 +765,9 @@ public class BlockPlacementPolicyDefault * replica while second set contains remaining replica nodes. * So pick up first set if not empty. If first is empty, then pick second. */ - protected Collection<DatanodeDescriptor> pickupReplicaSet( - Collection<DatanodeDescriptor> first, - Collection<DatanodeDescriptor> second) { + protected Collection<DatanodeStorageInfo> pickupReplicaSet( + Collection<DatanodeStorageInfo> first, + Collection<DatanodeStorageInfo> second) { return first.isEmpty() ? second : first; } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Tue Aug 19 23:49:39 2014 @@ -47,8 +47,8 @@ import org.apache.hadoop.net.NodeBase; public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault { protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, DatanodeManager datanodeManager) { + initialize(conf, stats, clusterMap, host2datanodeMap); } protected BlockPlacementPolicyWithNodeGroup() { @@ -56,8 +56,9 @@ public class BlockPlacementPolicyWithNod @Override public void initialize(Configuration conf, FSClusterStats stats, - NetworkTopology clusterMap) { - super.initialize(conf, stats, clusterMap); + NetworkTopology clusterMap, + Host2NodesMap host2datanodeMap) { + super.initialize(conf, stats, clusterMap, host2datanodeMap); } /** choose local node of localMachine as the target. @@ -69,7 +70,8 @@ public class BlockPlacementPolicyWithNod protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeStorageInfo> results, boolean avoidStaleNodes, - StorageType storageType) throws NotEnoughReplicasException { + StorageType storageType, boolean fallbackToLocalRack + ) throws NotEnoughReplicasException { // if no local machine, randomly choose one node if (localMachine == null) return chooseRandom(NodeBase.ROOT, excludedNodes, @@ -96,6 +98,10 @@ public class BlockPlacementPolicyWithNod if (chosenStorage != null) { return chosenStorage; } + + if (!fallbackToLocalRack) { + return null; + } // try a node on local rack return chooseLocalRack(localMachine, excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType); @@ -241,6 +247,36 @@ public class BlockPlacementPolicyWithNod countOfExcludedNodes++; } } + + countOfExcludedNodes += addDependentNodesToExcludedNodes( + chosenNode, excludedNodes); + return countOfExcludedNodes; + } + + /** + * Add all nodes from a dependent nodes list to excludedNodes. + * @return number of new excluded nodes + */ + private int addDependentNodesToExcludedNodes(DatanodeDescriptor chosenNode, + Set<Node> excludedNodes) { + if (this.host2datanodeMap == null) { + return 0; + } + int countOfExcludedNodes = 0; + for(String hostname : chosenNode.getDependentHostNames()) { + DatanodeDescriptor node = + this.host2datanodeMap.getDataNodeByHostName(hostname); + if(node!=null) { + if (excludedNodes.add(node)) { + countOfExcludedNodes++; + } + } else { + LOG.warn("Not able to find datanode " + hostname + + " which has dependency with datanode " + + chosenNode.getHostName()); + } + } + return countOfExcludedNodes; } @@ -255,9 +291,9 @@ public class BlockPlacementPolicyWithNod * If first is empty, then pick second. */ @Override - public Collection<DatanodeDescriptor> pickupReplicaSet( - Collection<DatanodeDescriptor> first, - Collection<DatanodeDescriptor> second) { + public Collection<DatanodeStorageInfo> pickupReplicaSet( + Collection<DatanodeStorageInfo> first, + Collection<DatanodeStorageInfo> second) { // If no replica within same rack, return directly. if (first.isEmpty()) { return second; @@ -265,25 +301,24 @@ public class BlockPlacementPolicyWithNod // Split data nodes in the first set into two sets, // moreThanOne contains nodes on nodegroup with more than one replica // exactlyOne contains the remaining nodes - Map<String, List<DatanodeDescriptor>> nodeGroupMap = - new HashMap<String, List<DatanodeDescriptor>>(); + Map<String, List<DatanodeStorageInfo>> nodeGroupMap = + new HashMap<String, List<DatanodeStorageInfo>>(); - for(DatanodeDescriptor node : first) { - final String nodeGroupName = - NetworkTopology.getLastHalf(node.getNetworkLocation()); - List<DatanodeDescriptor> datanodeList = - nodeGroupMap.get(nodeGroupName); - if (datanodeList == null) { - datanodeList = new ArrayList<DatanodeDescriptor>(); - nodeGroupMap.put(nodeGroupName, datanodeList); + for(DatanodeStorageInfo storage : first) { + final String nodeGroupName = NetworkTopology.getLastHalf( + storage.getDatanodeDescriptor().getNetworkLocation()); + List<DatanodeStorageInfo> storageList = nodeGroupMap.get(nodeGroupName); + if (storageList == null) { + storageList = new ArrayList<DatanodeStorageInfo>(); + nodeGroupMap.put(nodeGroupName, storageList); } - datanodeList.add(node); + storageList.add(storage); } - final List<DatanodeDescriptor> moreThanOne = new ArrayList<DatanodeDescriptor>(); - final List<DatanodeDescriptor> exactlyOne = new ArrayList<DatanodeDescriptor>(); + final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>(); + final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>(); // split nodes into two sets - for(List<DatanodeDescriptor> datanodeList : nodeGroupMap.values()) { + for(List<DatanodeStorageInfo> datanodeList : nodeGroupMap.values()) { if (datanodeList.size() == 1 ) { // exactlyOne contains nodes on nodegroup with exactly one replica exactlyOne.add(datanodeList.get(0)); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Tue Aug 19 23:49:39 2014 @@ -23,8 +23,8 @@ import org.apache.hadoop.hdfs.protocol.B import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; -import org.apache.hadoop.util.LightWeightGSet.SetIterator; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -217,9 +217,14 @@ class BlocksMap { BlockInfo currentBlock = blocks.get(newBlock); assert currentBlock != null : "the block if not in blocksMap"; // replace block in data-node lists - for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) { - DatanodeDescriptor dn = currentBlock.getDatanode(idx); - dn.replaceBlock(currentBlock, newBlock); + for (int i = currentBlock.numNodes() - 1; i >= 0; i--) { + final DatanodeDescriptor dn = currentBlock.getDatanode(i); + final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn); + final boolean removed = storage.removeBlock(currentBlock); + Preconditions.checkState(removed, "currentBlock not found."); + + final boolean added = storage.addBlock(newBlock); + Preconditions.checkState(added, "newBlock already exists."); } // replace block in the map itself blocks.put(newBlock); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Tue Aug 19 23:49:39 2014 @@ -33,8 +33,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.hdfs.protocol.Block; @@ -53,8 +51,11 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.util.ReadOnlyList; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.base.Preconditions; +; /** * Scans the namesystem, scheduling blocks to be cached as appropriate. @@ -65,8 +66,8 @@ import com.google.common.base.Preconditi @InterfaceAudience.LimitedPrivate({"HDFS"}) public class CacheReplicationMonitor extends Thread implements Closeable { - private static final Log LOG = - LogFactory.getLog(CacheReplicationMonitor.class); + private static final Logger LOG = + LoggerFactory.getLogger(CacheReplicationMonitor.class); private final FSNamesystem namesystem; @@ -103,21 +104,21 @@ public class CacheReplicationMonitor ext private final Condition scanFinished; /** - * Whether there are pending CacheManager operations that necessitate a - * CacheReplicationMonitor rescan. Protected by the CRM lock. + * The number of rescans completed. Used to wait for scans to finish. + * Protected by the CacheReplicationMonitor lock. */ - private boolean needsRescan = true; + private long completedScanCount = 0; /** - * Whether we are currently doing a rescan. Protected by the CRM lock. + * The scan we're currently performing, or -1 if no scan is in progress. + * Protected by the CacheReplicationMonitor lock. */ - private boolean isScanning = false; + private long curScanCount = -1; /** - * The number of rescans completed. Used to wait for scans to finish. - * Protected by the CacheReplicationMonitor lock. + * The number of rescans we need to complete. Protected by the CRM lock. */ - private long scanCount = 0; + private long neededScanCount = 0; /** * True if this monitor should terminate. Protected by the CRM lock. @@ -168,7 +169,7 @@ public class CacheReplicationMonitor ext LOG.info("Shutting down CacheReplicationMonitor"); return; } - if (needsRescan) { + if (completedScanCount < neededScanCount) { LOG.info("Rescanning because of pending operations"); break; } @@ -181,8 +182,6 @@ public class CacheReplicationMonitor ext doRescan.await(delta, TimeUnit.MILLISECONDS); curTimeMs = Time.monotonicNow(); } - isScanning = true; - needsRescan = false; } finally { lock.unlock(); } @@ -193,8 +192,8 @@ public class CacheReplicationMonitor ext // Update synchronization-related variables. lock.lock(); try { - isScanning = false; - scanCount++; + completedScanCount = curScanCount; + curScanCount = -1; scanFinished.signalAll(); } finally { lock.unlock(); @@ -207,7 +206,7 @@ public class CacheReplicationMonitor ext LOG.info("Shutting down CacheReplicationMonitor."); return; } catch (Throwable t) { - LOG.fatal("Thread exiting", t); + LOG.error("Thread exiting", t); terminate(1, t); } } @@ -225,16 +224,15 @@ public class CacheReplicationMonitor ext "Must not hold the FSN write lock when waiting for a rescan."); Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan."); - if (!needsRescan) { + if (neededScanCount <= completedScanCount) { return; } // If no scan is already ongoing, mark the CRM as dirty and kick - if (!isScanning) { + if (curScanCount < 0) { doRescan.signal(); } // Wait until the scan finishes and the count advances - final long startCount = scanCount; - while ((!shutdown) && (startCount >= scanCount)) { + while ((!shutdown) && (completedScanCount < neededScanCount)) { try { scanFinished.await(); } catch (InterruptedException e) { @@ -252,7 +250,14 @@ public class CacheReplicationMonitor ext public void setNeedsRescan() { Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when setting the needsRescan bit."); - this.needsRescan = true; + if (curScanCount >= 0) { + // If there is a scan in progress, we need to wait for the scan after + // that. + neededScanCount = curScanCount + 1; + } else { + // If there is no scan in progress, we need to wait for the next scan. + neededScanCount = completedScanCount + 1; + } } /** @@ -281,12 +286,19 @@ public class CacheReplicationMonitor ext private void rescan() throws InterruptedException { scannedDirectives = 0; scannedBlocks = 0; - namesystem.writeLock(); try { - if (shutdown) { - throw new InterruptedException("CacheReplicationMonitor was " + - "shut down."); + namesystem.writeLock(); + try { + lock.lock(); + if (shutdown) { + throw new InterruptedException("CacheReplicationMonitor was " + + "shut down."); + } + curScanCount = completedScanCount + 1; + } finally { + lock.unlock(); } + resetStatistics(); rescanCacheDirectives(); rescanCachedBlockMap(); @@ -316,11 +328,8 @@ public class CacheReplicationMonitor ext scannedDirectives++; // Skip processing this entry if it has expired if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) { - if (LOG.isDebugEnabled()) { - LOG.debug("Directive " + directive.getId() + ": the directive " + - "expired at " + directive.getExpiryTime() + " (now = " + - now + ")"); - } + LOG.debug("Directive {}: the directive expired at {} (now = {})", + directive.getId(), directive.getExpiryTime(), now); continue; } String path = directive.getPath(); @@ -329,17 +338,14 @@ public class CacheReplicationMonitor ext node = fsDir.getINode(path); } catch (UnresolvedLinkException e) { // We don't cache through symlinks - if (LOG.isDebugEnabled()) { - LOG.debug("Directive " + directive.getId() + - ": got UnresolvedLinkException while resolving path " + path); - } + LOG.debug("Directive {}: got UnresolvedLinkException while resolving " + + "path {}", directive.getId(), path + ); continue; } if (node == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Directive " + directive.getId() + - ": No inode found at " + path); - } + LOG.debug("Directive {}: No inode found at {}", directive.getId(), + path); } else if (node.isDirectory()) { INodeDirectory dir = node.asDirectory(); ReadOnlyList<INode> children = dir @@ -352,10 +358,8 @@ public class CacheReplicationMonitor ext } else if (node.isFile()) { rescanFile(directive, node.asFile()); } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Directive " + directive.getId() + - ": ignoring non-directive, non-file inode " + node); - } + LOG.debug("Directive {}: ignoring non-directive, non-file inode {} ", + directive.getId(), node); } } } @@ -381,15 +385,13 @@ public class CacheReplicationMonitor ext // do not cache this file. CachePool pool = directive.getPool(); if (pool.getBytesNeeded() > pool.getLimit()) { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Directive %d: not scanning file %s because " + - "bytesNeeded for pool %s is %d, but the pool's limit is %d", - directive.getId(), - file.getFullPathName(), - pool.getPoolName(), - pool.getBytesNeeded(), - pool.getLimit())); - } + LOG.debug("Directive {}: not scanning file {} because " + + "bytesNeeded for pool {} is {}, but the pool's limit is {}", + directive.getId(), + file.getFullPathName(), + pool.getPoolName(), + pool.getBytesNeeded(), + pool.getLimit()); return; } @@ -397,11 +399,10 @@ public class CacheReplicationMonitor ext for (BlockInfo blockInfo : blockInfos) { if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) { // We don't try to cache blocks that are under construction. - if (LOG.isTraceEnabled()) { - LOG.trace("Directive " + directive.getId() + ": can't cache " + - "block " + blockInfo + " because it is in state " + - blockInfo.getBlockUCState() + ", not COMPLETE."); - } + LOG.trace("Directive {}: can't cache block {} because it is in state " + + "{}, not COMPLETE.", directive.getId(), blockInfo, + blockInfo.getBlockUCState() + ); continue; } Block block = new Block(blockInfo.getBlockId()); @@ -415,7 +416,7 @@ public class CacheReplicationMonitor ext // Update bytesUsed using the current replication levels. // Assumptions: we assume that all the blocks are the same length // on each datanode. We can assume this because we're only caching - // blocks in state COMMITTED. + // blocks in state COMPLETE. // Note that if two directives are caching the same block(s), they will // both get them added to their bytesCached. List<DatanodeDescriptor> cachedOn = @@ -441,21 +442,16 @@ public class CacheReplicationMonitor ext ocblock.setReplicationAndMark(directive.getReplication(), mark); } } - if (LOG.isTraceEnabled()) { - LOG.trace("Directive " + directive.getId() + ": setting replication " + - "for block " + blockInfo + " to " + ocblock.getReplication()); - } + LOG.trace("Directive {}: setting replication for block {} to {}", + directive.getId(), blockInfo, ocblock.getReplication()); } // Increment the "cached" statistics directive.addBytesCached(cachedTotal); if (cachedTotal == neededTotal) { directive.addFilesCached(1); } - if (LOG.isDebugEnabled()) { - LOG.debug("Directive " + directive.getId() + ": caching " + - file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal + - " bytes"); - } + LOG.debug("Directive {}: caching {}: {}/{} bytes", directive.getId(), + file.getFullPathName(), cachedTotal, neededTotal); } private String findReasonForNotCaching(CachedBlock cblock, @@ -512,11 +508,9 @@ public class CacheReplicationMonitor ext iter.hasNext(); ) { DatanodeDescriptor datanode = iter.next(); if (!cblock.isInList(datanode.getCached())) { - if (LOG.isTraceEnabled()) { - LOG.trace("Block " + cblock.getBlockId() + ": removing from " + - "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() + - "because the DataNode uncached it."); - } + LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} " + + "because the DataNode uncached it.", cblock.getBlockId(), + datanode.getDatanodeUuid()); datanode.getPendingUncached().remove(cblock); iter.remove(); } @@ -526,10 +520,8 @@ public class CacheReplicationMonitor ext String reason = findReasonForNotCaching(cblock, blockInfo); int neededCached = 0; if (reason != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("Block " + cblock.getBlockId() + ": can't cache " + - "block because it is " + reason); - } + LOG.trace("Block {}: can't cache block because it is {}", + cblock.getBlockId(), reason); } else { neededCached = cblock.getReplication(); } @@ -541,12 +533,12 @@ public class CacheReplicationMonitor ext DatanodeDescriptor datanode = iter.next(); datanode.getPendingCached().remove(cblock); iter.remove(); - if (LOG.isTraceEnabled()) { - LOG.trace("Block " + cblock.getBlockId() + ": removing from " + - "PENDING_CACHED for node " + datanode.getDatanodeUuid() + - "because we already have " + numCached + " cached " + - "replicas and we only need " + neededCached); - } + LOG.trace("Block {}: removing from PENDING_CACHED for node {}" + + "because we already have {} cached replicas and we only" + + " need {}", + cblock.getBlockId(), datanode.getDatanodeUuid(), numCached, + neededCached + ); } } if (numCached < neededCached) { @@ -556,12 +548,11 @@ public class CacheReplicationMonitor ext DatanodeDescriptor datanode = iter.next(); datanode.getPendingUncached().remove(cblock); iter.remove(); - if (LOG.isTraceEnabled()) { - LOG.trace("Block " + cblock.getBlockId() + ": removing from " + - "PENDING_UNCACHED for node " + datanode.getDatanodeUuid() + - "because we only have " + numCached + " cached replicas " + - "and we need " + neededCached); - } + LOG.trace("Block {}: removing from PENDING_UNCACHED for node {} " + + "because we only have {} cached replicas and we need " + + "{}", cblock.getBlockId(), datanode.getDatanodeUuid(), + numCached, neededCached + ); } } int neededUncached = numCached - @@ -581,11 +572,10 @@ public class CacheReplicationMonitor ext pendingUncached.isEmpty() && pendingCached.isEmpty()) { // we have nothing more to do with this block. - if (LOG.isTraceEnabled()) { - LOG.trace("Block " + cblock.getBlockId() + ": removing from " + - "cachedBlocks, since neededCached == 0, and " + - "pendingUncached and pendingCached are empty."); - } + LOG.trace("Block {}: removing from cachedBlocks, since neededCached " + + "== 0, and pendingUncached and pendingCached are empty.", + cblock.getBlockId() + ); cbIter.remove(); } } @@ -643,18 +633,14 @@ public class CacheReplicationMonitor ext BlockInfo blockInfo = blockManager. getStoredBlock(new Block(cachedBlock.getBlockId())); if (blockInfo == null) { - if (LOG.isDebugEnabled()) { - LOG.debug("Block " + cachedBlock.getBlockId() + ": can't add new " + - "cached replicas, because there is no record of this block " + - "on the NameNode."); - } + LOG.debug("Block {}: can't add new cached replicas," + + " because there is no record of this block " + + "on the NameNode.", cachedBlock.getBlockId()); return; } if (!blockInfo.isComplete()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Block " + cachedBlock.getBlockId() + ": can't cache this " + - "block, because it is not yet complete."); - } + LOG.debug("Block {}: can't cache this block, because it is not yet" + + " complete.", cachedBlock.getBlockId()); return; } // Filter the list of replicas to only the valid targets @@ -678,7 +664,7 @@ public class CacheReplicationMonitor ext if (pendingCached.contains(datanode) || cached.contains(datanode)) { continue; } - long pendingCapacity = datanode.getCacheRemaining(); + long pendingBytes = 0; // Subtract pending cached blocks from effective capacity Iterator<CachedBlock> it = datanode.getPendingCached().iterator(); while (it.hasNext()) { @@ -686,7 +672,7 @@ public class CacheReplicationMonitor ext BlockInfo info = blockManager.getStoredBlock(new Block(cBlock.getBlockId())); if (info != null) { - pendingCapacity -= info.getNumBytes(); + pendingBytes -= info.getNumBytes(); } } it = datanode.getPendingUncached().iterator(); @@ -696,17 +682,17 @@ public class CacheReplicationMonitor ext BlockInfo info = blockManager.getStoredBlock(new Block(cBlock.getBlockId())); if (info != null) { - pendingCapacity += info.getNumBytes(); + pendingBytes += info.getNumBytes(); } } + long pendingCapacity = pendingBytes + datanode.getCacheRemaining(); if (pendingCapacity < blockInfo.getNumBytes()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Block " + blockInfo.getBlockId() + ": DataNode " + - datanode.getDatanodeUuid() + " is not a valid possibility " + - "because the block has size " + blockInfo.getNumBytes() + ", but " + - "the DataNode only has " + datanode.getCacheRemaining() + " " + - "bytes of cache remaining."); - } + LOG.trace("Block {}: DataNode {} is not a valid possibility " + + "because the block has size {}, but the DataNode only has {}" + + "bytes of cache remaining ({} pending bytes, {} already cached.", + blockInfo.getBlockId(), datanode.getDatanodeUuid(), + blockInfo.getNumBytes(), pendingCapacity, pendingBytes, + datanode.getCacheRemaining()); outOfCapacity++; continue; } @@ -715,22 +701,20 @@ public class CacheReplicationMonitor ext List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities, neededCached, blockManager.getDatanodeManager().getStaleInterval()); for (DatanodeDescriptor datanode : chosen) { - if (LOG.isTraceEnabled()) { - LOG.trace("Block " + blockInfo.getBlockId() + ": added to " + - "PENDING_CACHED on DataNode " + datanode.getDatanodeUuid()); - } + LOG.trace("Block {}: added to PENDING_CACHED on DataNode {}", + blockInfo.getBlockId(), datanode.getDatanodeUuid()); pendingCached.add(datanode); boolean added = datanode.getPendingCached().add(cachedBlock); assert added; } // We were unable to satisfy the requested replication factor if (neededCached > chosen.size()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Block " + blockInfo.getBlockId() + ": we only have " + - (cachedBlock.getReplication() - neededCached + chosen.size()) + - " of " + cachedBlock.getReplication() + " cached replicas. " + - outOfCapacity + " DataNodes have insufficient cache capacity."); - } + LOG.debug("Block {}: we only have {} of {} cached replicas." + + " {} DataNodes have insufficient cache capacity.", + blockInfo.getBlockId(), + (cachedBlock.getReplication() - neededCached + chosen.size()), + cachedBlock.getReplication(), outOfCapacity + ); } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java Tue Aug 19 23:49:39 2014 @@ -48,18 +48,6 @@ public class CorruptReplicasMap{ private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap = new TreeMap<Block, Map<DatanodeDescriptor, Reason>>(); - - /** - * Mark the block belonging to datanode as corrupt. - * - * @param blk Block to be added to CorruptReplicasMap - * @param dn DatanodeDescriptor which holds the corrupt replica - * @param reason a textual reason (for logging purposes) - */ - public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, - String reason) { - addToCorruptReplicasMap(blk, dn, reason, Reason.NONE); - } /** * Mark the block belonging to datanode as corrupt. @@ -69,7 +57,7 @@ public class CorruptReplicasMap{ * @param reason a textual reason (for logging purposes) * @param reasonCode the enum representation of the reason */ - public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, + void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, String reason, Reason reasonCode) { Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk); if (nodes == null) { @@ -127,7 +115,6 @@ public class CorruptReplicasMap{ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode, Reason reason) { Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk); - boolean removed = false; if (datanodes==null) return false; @@ -174,12 +161,12 @@ public class CorruptReplicasMap{ return ((nodes != null) && (nodes.contains(node))); } - public int numCorruptReplicas(Block blk) { + int numCorruptReplicas(Block blk) { Collection<DatanodeDescriptor> nodes = getNodes(blk); return (nodes == null) ? 0 : nodes.size(); } - public int size() { + int size() { return corruptReplicasMap.size(); }