Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Aug 20 01:34:29 2014 @@ -1079,6 +1079,7 @@ public class BlockManager { * Mark the block belonging to datanode as corrupt * @param blk Block to be marked as corrupt * @param dn Datanode which holds the corrupt replica + * @param storageID if known, null otherwise. * @param reason a textual reason why the block should be marked corrupt, * for logging purposes */ @@ -1095,19 +1096,29 @@ public class BlockManager { + blk + " not found"); return; } - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, - blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), - dn, storageID); - } - private void markBlockAsCorrupt(BlockToMarkCorrupt b, - DatanodeInfo dn, String storageID) throws IOException { DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { - throw new IOException("Cannot mark " + b + throw new IOException("Cannot mark " + blk + " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid() + ") does not exist"); } + + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, + blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), + storageID == null ? null : node.getStorageInfo(storageID), + node); + } + + /** + * + * @param b + * @param storageInfo storage that contains the block, if known. null otherwise. + * @throws IOException + */ + private void markBlockAsCorrupt(BlockToMarkCorrupt b, + DatanodeStorageInfo storageInfo, + DatanodeDescriptor node) throws IOException { BlockCollection bc = b.corrupted.getBlockCollection(); if (bc == null) { @@ -1118,7 +1129,9 @@ public class BlockManager { } // Add replica to the data-node if it is not already there - node.addBlock(storageID, b.stored); + if (storageInfo != null) { + storageInfo.addBlock(b.stored); + } // Add this replica to corruptReplicas Map corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, @@ -1457,7 +1470,7 @@ public class BlockManager { * @throws IOException * if the number of targets < minimum replication. * @see BlockPlacementPolicy#chooseTarget(String, int, Node, - * List, boolean, Set, long) + * List, boolean, Set, long, StorageType) */ public DatanodeStorageInfo[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, @@ -1694,7 +1707,7 @@ public class BlockManager { * @throws IOException */ public boolean processReport(final DatanodeID nodeID, - final DatanodeStorage storage, final String poolId, + final DatanodeStorage storage, final BlockListAsLongs newReport) throws IOException { namesystem.writeLock(); final long startTime = Time.now(); //after acquiring write lock @@ -1726,9 +1739,9 @@ public class BlockManager { if (storageInfo.numBlocks() == 0) { // The first block report can be processed a lot more efficiently than // ordinary block reports. This shortens restart times. - processFirstBlockReport(node, storage.getStorageID(), newReport); + processFirstBlockReport(storageInfo, newReport); } else { - processReport(node, storage, newReport); + processReport(storageInfo, newReport); } // Now that we have an up-to-date block report, we know that any @@ -1790,9 +1803,8 @@ public class BlockManager { } } - private void processReport(final DatanodeDescriptor node, - final DatanodeStorage storage, - final BlockListAsLongs report) throws IOException { + private void processReport(final DatanodeStorageInfo storageInfo, + final BlockListAsLongs report) throws IOException { // Normal case: // Modify the (block-->datanode) map, according to the difference // between the old and new block report. @@ -1802,19 +1814,20 @@ public class BlockManager { Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); - reportDiff(node, storage, report, + reportDiff(storageInfo, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); - + + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Process the blocks on each queue for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node, storage.getStorageID()); + addStoredBlockUnderConstruction(b, storageInfo); } for (Block b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, storage.getStorageID(), null, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -1828,7 +1841,7 @@ public class BlockManager { addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node, storage.getStorageID()); + markBlockAsCorrupt(b, storageInfo, node); } } @@ -1839,16 +1852,16 @@ public class BlockManager { * a toRemove list (since there won't be any). It also silently discards * any invalid blocks, thereby deferring their processing until * the next block report. - * @param node - DatanodeDescriptor of the node that sent the report + * @param storageInfo - DatanodeStorageInfo that sent the report * @param report - the initial block report, to be processed * @throws IOException */ - private void processFirstBlockReport(final DatanodeDescriptor node, - final String storageID, + private void processFirstBlockReport( + final DatanodeStorageInfo storageInfo, final BlockListAsLongs report) throws IOException { if (report == null) return; assert (namesystem.hasWriteLock()); - assert (node.getStorageInfo(storageID).numBlocks() == 0); + assert (storageInfo.numBlocks() == 0); BlockReportIterator itBR = report.getBlockReportIterator(); while(itBR.hasNext()) { @@ -1857,7 +1870,7 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(iblk)) { - queueReportedBlock(node, storageID, iblk, reportedState, + queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); continue; } @@ -1869,15 +1882,16 @@ public class BlockManager { // If block is corrupt, mark it and continue to next block. BlockUCState ucState = storedBlock.getBlockUCState(); BlockToMarkCorrupt c = checkReplicaCorrupt( - iblk, reportedState, storedBlock, ucState, node); + iblk, reportedState, storedBlock, ucState, + storageInfo.getDatanodeDescriptor()); if (c != null) { if (shouldPostponeBlocksFromFuture) { // In the Standby, we may receive a block report for a file that we // just have an out-of-date gen-stamp or state for, for example. - queueReportedBlock(node, storageID, iblk, reportedState, + queueReportedBlock(storageInfo, iblk, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { - markBlockAsCorrupt(c, node, storageID); + markBlockAsCorrupt(c, storageInfo, storageInfo.getDatanodeDescriptor()); } continue; } @@ -1885,7 +1899,7 @@ public class BlockManager { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { ((BlockInfoUnderConstruction)storedBlock).addReplicaIfNotPresent( - node.getStorageInfo(storageID), iblk, reportedState); + storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 @@ -1898,12 +1912,12 @@ public class BlockManager { } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, node, storageID); + addStoredBlockImmediate(storedBlock, storageInfo); } } } - private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage, + private void reportDiff(DatanodeStorageInfo storageInfo, BlockListAsLongs newReport, Collection<BlockInfo> toAdd, // add to DatanodeDescriptor Collection<Block> toRemove, // remove from DatanodeDescriptor @@ -1911,8 +1925,6 @@ public class BlockManager { Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list Collection<StatefulBlockInfo> toUC) { // add to under-construction list - final DatanodeStorageInfo storageInfo = dn.getStorageInfo(storage.getStorageID()); - // place a delimiter in the list which separates blocks // that have been reported from those that have not BlockInfo delimiter = new BlockInfo(new Block(), 1); @@ -1929,7 +1941,7 @@ public class BlockManager { while(itBR.hasNext()) { Block iblk = itBR.next(); ReplicaState iState = itBR.getCurrentReplicaState(); - BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(), + BlockInfo storedBlock = processReportedBlock(storageInfo, iblk, iState, toAdd, toInvalidate, toCorrupt, toUC); // move block to the head of the list @@ -1966,7 +1978,7 @@ public class BlockManager { * BlockInfoUnderConstruction's list of replicas.</li> * </ol> * - * @param dn descriptor for the datanode that made the report + * @param storageInfo DatanodeStorageInfo that sent the report. * @param block reported block replica * @param reportedState reported replica state * @param toAdd add to DatanodeDescriptor @@ -1978,14 +1990,16 @@ public class BlockManager { * @return the up-to-date stored block, if it should be kept. * Otherwise, null. */ - private BlockInfo processReportedBlock(final DatanodeDescriptor dn, - final String storageID, + private BlockInfo processReportedBlock( + final DatanodeStorageInfo storageInfo, final Block block, final ReplicaState reportedState, final Collection<BlockInfo> toAdd, final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC) { + DatanodeDescriptor dn = storageInfo.getDatanodeDescriptor(); + if(LOG.isDebugEnabled()) { LOG.debug("Reported block " + block + " on " + dn + " size " + block.getNumBytes() @@ -1994,7 +2008,7 @@ public class BlockManager { if (shouldPostponeBlocksFromFuture && namesystem.isGenStampInFuture(block)) { - queueReportedBlock(dn, storageID, block, reportedState, + queueReportedBlock(storageInfo, block, reportedState, QUEUE_REASON_FUTURE_GENSTAMP); return null; } @@ -2034,7 +2048,7 @@ public class BlockManager { // TODO: Pretty confident this should be s/storedBlock/block below, // since we should be postponing the info of the reported block, not // the stored block. See HDFS-6289 for more context. - queueReportedBlock(dn, storageID, storedBlock, reportedState, + queueReportedBlock(storageInfo, storedBlock, reportedState, QUEUE_REASON_CORRUPT_STATE); } else { toCorrupt.add(c); @@ -2051,7 +2065,7 @@ public class BlockManager { // Add replica if appropriate. If the replica was previously corrupt // but now okay, it might need to be updated. if (reportedState == ReplicaState.FINALIZED - && (storedBlock.findDatanode(dn) < 0 + && (!storedBlock.findDatanode(dn) || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { toAdd.add(storedBlock); } @@ -2063,17 +2077,17 @@ public class BlockManager { * standby node. @see PendingDataNodeMessages. * @param reason a textual reason to report in the debug logs */ - private void queueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, + private void queueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, String reason) { assert shouldPostponeBlocksFromFuture; if (LOG.isDebugEnabled()) { LOG.debug("Queueing reported block " + block + " in state " + reportedState + - " from datanode " + dn + " for later processing " + - "because " + reason + "."); + " from datanode " + storageInfo.getDatanodeDescriptor() + + " for later processing because " + reason + "."); } - pendingDNMessages.enqueueReportedBlock(dn, storageID, block, reportedState); + pendingDNMessages.enqueueReportedBlock(storageInfo, block, reportedState); } /** @@ -2096,7 +2110,7 @@ public class BlockManager { if (LOG.isDebugEnabled()) { LOG.debug("Processing previouly queued message " + rbi); } - processAndHandleReportedBlock(rbi.getNode(), rbi.getStorageID(), + processAndHandleReportedBlock(rbi.getStorageInfo(), rbi.getBlock(), rbi.getReportedState(), null); } } @@ -2153,6 +2167,16 @@ public class BlockManager { } else { return null; // not corrupt } + case UNDER_CONSTRUCTION: + if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) { + final long reportedGS = reported.getGenerationStamp(); + return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " + + ucState + " and reported state " + reportedState + + ", But reported genstamp " + reportedGS + + " does not match genstamp in block map " + + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); + } + return null; default: return null; } @@ -2216,19 +2240,20 @@ public class BlockManager { } void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, - DatanodeDescriptor node, String storageID) throws IOException { + DatanodeStorageInfo storageInfo) throws IOException { BlockInfoUnderConstruction block = ucBlock.storedBlock; - block.addReplicaIfNotPresent(node.getStorageInfo(storageID), - ucBlock.reportedBlock, ucBlock.reportedState); + block.addReplicaIfNotPresent( + storageInfo, ucBlock.reportedBlock, ucBlock.reportedState); - if (ucBlock.reportedState == ReplicaState.FINALIZED && block.findDatanode(node) < 0) { - addStoredBlock(block, node, storageID, null, true); + if (ucBlock.reportedState == ReplicaState.FINALIZED && + !block.findDatanode(storageInfo.getDatanodeDescriptor())) { + addStoredBlock(block, storageInfo, null, true); } } /** * Faster version of - * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, String, DatanodeDescriptor, boolean)} + * {@link #addStoredBlock(BlockInfo, DatanodeStorageInfo, DatanodeDescriptor, boolean)} * , intended for use with initial block report at startup. If not in startup * safe mode, will call standard addStoredBlock(). Assumes this method is * called "immediately" so there is no need to refresh the storedBlock from @@ -2239,17 +2264,17 @@ public class BlockManager { * @throws IOException */ private void addStoredBlockImmediate(BlockInfo storedBlock, - DatanodeDescriptor node, String storageID) + DatanodeStorageInfo storageInfo) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, node, storageID, null, false); + addStoredBlock(storedBlock, storageInfo, null, false); return; } // just add it - node.addBlock(storageID, storedBlock); + storageInfo.addBlock(storedBlock); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); @@ -2271,13 +2296,13 @@ public class BlockManager { * @return the block that is stored in blockMap. */ private Block addStoredBlock(final BlockInfo block, - DatanodeDescriptor node, - String storageID, + DatanodeStorageInfo storageInfo, DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); if (block instanceof BlockInfoUnderConstruction) { //refresh our copy in case the block got completed in another thread storedBlock = blocksMap.getStoredBlock(block); @@ -2297,7 +2322,7 @@ public class BlockManager { assert bc != null : "Block must belong to a file"; // add block to the datanode - boolean added = node.addBlock(storageID, storedBlock); + boolean added = storageInfo.addBlock(storedBlock); int curReplicaDelta; if (added) { @@ -2826,12 +2851,15 @@ public class BlockManager { } else { final String[] datanodeUuids = new String[locations.size()]; final String[] storageIDs = new String[datanodeUuids.length]; + final StorageType[] storageTypes = new StorageType[datanodeUuids.length]; for(int i = 0; i < locations.size(); i++) { final DatanodeStorageInfo s = locations.get(i); datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid(); storageIDs[i] = s.getStorageID(); + storageTypes[i] = s.getStorageType(); } - results.add(new BlockWithLocations(block, datanodeUuids, storageIDs)); + results.add(new BlockWithLocations(block, datanodeUuids, storageIDs, + storageTypes)); return block.getNumBytes(); } } @@ -2840,8 +2868,9 @@ public class BlockManager { * The given node is reporting that it received a certain block. */ @VisibleForTesting - void addBlock(DatanodeDescriptor node, String storageID, Block block, String delHint) + void addBlock(DatanodeStorageInfo storageInfo, Block block, String delHint) throws IOException { + DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); // Decrement number of blocks scheduled to this datanode. // for a retry request (of DatanodeProtocol#blockReceivedAndDeleted with // RECEIVED_BLOCK), we currently also decrease the approximate number. @@ -2861,12 +2890,12 @@ public class BlockManager { // Modify the blocks->datanode map and node's map. // pendingReplications.decrement(block, node); - processAndHandleReportedBlock(node, storageID, block, ReplicaState.FINALIZED, + processAndHandleReportedBlock(storageInfo, block, ReplicaState.FINALIZED, delHintNode); } - private void processAndHandleReportedBlock(DatanodeDescriptor node, - String storageID, Block block, + private void processAndHandleReportedBlock( + DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState, DatanodeDescriptor delHintNode) throws IOException { // blockReceived reports a finalized block @@ -2874,7 +2903,9 @@ public class BlockManager { Collection<Block> toInvalidate = new LinkedList<Block>(); Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); - processReportedBlock(node, storageID, block, reportedState, + final DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); + + processReportedBlock(storageInfo, block, reportedState, toAdd, toInvalidate, toCorrupt, toUC); // the block is only in one of the to-do lists // if it is in none then data-node already has it @@ -2882,11 +2913,11 @@ public class BlockManager { : "The block should be only in one of the lists."; for (StatefulBlockInfo b : toUC) { - addStoredBlockUnderConstruction(b, node, storageID); + addStoredBlockUnderConstruction(b, storageInfo); } long numBlocksLogged = 0; for (BlockInfo b : toAdd) { - addStoredBlock(b, node, storageID, delHintNode, numBlocksLogged < maxNumBlocksToLog); + addStoredBlock(b, storageInfo, delHintNode, numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2900,7 +2931,7 @@ public class BlockManager { addToInvalidates(b, node); } for (BlockToMarkCorrupt b : toCorrupt) { - markBlockAsCorrupt(b, node, storageID); + markBlockAsCorrupt(b, storageInfo, node); } } @@ -2927,13 +2958,15 @@ public class BlockManager { "Got incremental block report from unregistered or dead node"); } - if (node.getStorageInfo(srdb.getStorage().getStorageID()) == null) { + DatanodeStorageInfo storageInfo = + node.getStorageInfo(srdb.getStorage().getStorageID()); + if (storageInfo == null) { // The DataNode is reporting an unknown storage. Usually the NN learns // about new storages from heartbeats but during NN restart we may // receive a block report or incremental report before the heartbeat. // We must handle this for protocol compatibility. This issue was // uncovered by HDFS-6094. - node.updateStorage(srdb.getStorage()); + storageInfo = node.updateStorage(srdb.getStorage()); } for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) { @@ -2943,14 +2976,13 @@ public class BlockManager { deleted++; break; case RECEIVED_BLOCK: - addBlock(node, srdb.getStorage().getStorageID(), - rdbi.getBlock(), rdbi.getDelHints()); + addBlock(storageInfo, rdbi.getBlock(), rdbi.getDelHints()); received++; break; case RECEIVING_BLOCK: receiving++; - processAndHandleReportedBlock(node, srdb.getStorage().getStorageID(), - rdbi.getBlock(), ReplicaState.RBW, null); + processAndHandleReportedBlock(storageInfo, rdbi.getBlock(), + ReplicaState.RBW, null); break; default: String msg = @@ -3142,6 +3174,15 @@ public class BlockManager { } } } + + if (!status && !srcNode.isAlive) { + LOG.warn("srcNode " + srcNode + " is dead " + + "when decommission is in progress. Continue to mark " + + "it as decommission in progress. In that way, when it rejoins the " + + "cluster it can continue the decommission process."); + status = true; + } + srcNode.decommissioningStatus.set(underReplicatedBlocks, decommissionOnlyReplicas, underReplicatedInOpenFiles);
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Wed Aug 20 01:34:29 2014 @@ -23,8 +23,8 @@ import org.apache.hadoop.hdfs.protocol.B import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.GSet; import org.apache.hadoop.util.LightWeightGSet; -import org.apache.hadoop.util.LightWeightGSet.SetIterator; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; @@ -217,9 +217,14 @@ class BlocksMap { BlockInfo currentBlock = blocks.get(newBlock); assert currentBlock != null : "the block if not in blocksMap"; // replace block in data-node lists - for(int idx = currentBlock.numNodes()-1; idx >= 0; idx--) { - DatanodeDescriptor dn = currentBlock.getDatanode(idx); - dn.replaceBlock(currentBlock, newBlock); + for (int i = currentBlock.numNodes() - 1; i >= 0; i--) { + final DatanodeDescriptor dn = currentBlock.getDatanode(i); + final DatanodeStorageInfo storage = currentBlock.findStorageInfo(dn); + final boolean removed = storage.removeBlock(currentBlock); + Preconditions.checkState(removed, "currentBlock not found."); + + final boolean added = storage.addBlock(newBlock); + Preconditions.checkState(added, "newBlock already exists."); } // replace block in the map itself blocks.put(newBlock); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Wed Aug 20 01:34:29 2014 @@ -104,21 +104,21 @@ public class CacheReplicationMonitor ext private final Condition scanFinished; /** - * Whether there are pending CacheManager operations that necessitate a - * CacheReplicationMonitor rescan. Protected by the CRM lock. + * The number of rescans completed. Used to wait for scans to finish. + * Protected by the CacheReplicationMonitor lock. */ - private boolean needsRescan = true; + private long completedScanCount = 0; /** - * Whether we are currently doing a rescan. Protected by the CRM lock. + * The scan we're currently performing, or -1 if no scan is in progress. + * Protected by the CacheReplicationMonitor lock. */ - private boolean isScanning = false; + private long curScanCount = -1; /** - * The number of rescans completed. Used to wait for scans to finish. - * Protected by the CacheReplicationMonitor lock. + * The number of rescans we need to complete. Protected by the CRM lock. */ - private long scanCount = 0; + private long neededScanCount = 0; /** * True if this monitor should terminate. Protected by the CRM lock. @@ -169,7 +169,7 @@ public class CacheReplicationMonitor ext LOG.info("Shutting down CacheReplicationMonitor"); return; } - if (needsRescan) { + if (completedScanCount < neededScanCount) { LOG.info("Rescanning because of pending operations"); break; } @@ -182,8 +182,6 @@ public class CacheReplicationMonitor ext doRescan.await(delta, TimeUnit.MILLISECONDS); curTimeMs = Time.monotonicNow(); } - isScanning = true; - needsRescan = false; } finally { lock.unlock(); } @@ -194,8 +192,8 @@ public class CacheReplicationMonitor ext // Update synchronization-related variables. lock.lock(); try { - isScanning = false; - scanCount++; + completedScanCount = curScanCount; + curScanCount = -1; scanFinished.signalAll(); } finally { lock.unlock(); @@ -226,16 +224,15 @@ public class CacheReplicationMonitor ext "Must not hold the FSN write lock when waiting for a rescan."); Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when waiting for a rescan."); - if (!needsRescan) { + if (neededScanCount <= completedScanCount) { return; } // If no scan is already ongoing, mark the CRM as dirty and kick - if (!isScanning) { + if (curScanCount < 0) { doRescan.signal(); } // Wait until the scan finishes and the count advances - final long startCount = scanCount; - while ((!shutdown) && (startCount >= scanCount)) { + while ((!shutdown) && (completedScanCount < neededScanCount)) { try { scanFinished.await(); } catch (InterruptedException e) { @@ -253,7 +250,14 @@ public class CacheReplicationMonitor ext public void setNeedsRescan() { Preconditions.checkArgument(lock.isHeldByCurrentThread(), "Must hold the CRM lock when setting the needsRescan bit."); - this.needsRescan = true; + if (curScanCount >= 0) { + // If there is a scan in progress, we need to wait for the scan after + // that. + neededScanCount = curScanCount + 1; + } else { + // If there is no scan in progress, we need to wait for the next scan. + neededScanCount = completedScanCount + 1; + } } /** @@ -282,12 +286,19 @@ public class CacheReplicationMonitor ext private void rescan() throws InterruptedException { scannedDirectives = 0; scannedBlocks = 0; - namesystem.writeLock(); try { - if (shutdown) { - throw new InterruptedException("CacheReplicationMonitor was " + - "shut down."); + namesystem.writeLock(); + try { + lock.lock(); + if (shutdown) { + throw new InterruptedException("CacheReplicationMonitor was " + + "shut down."); + } + curScanCount = completedScanCount + 1; + } finally { + lock.unlock(); } + resetStatistics(); rescanCacheDirectives(); rescanCachedBlockMap(); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java Wed Aug 20 01:34:29 2014 @@ -48,18 +48,6 @@ public class CorruptReplicasMap{ private final SortedMap<Block, Map<DatanodeDescriptor, Reason>> corruptReplicasMap = new TreeMap<Block, Map<DatanodeDescriptor, Reason>>(); - - /** - * Mark the block belonging to datanode as corrupt. - * - * @param blk Block to be added to CorruptReplicasMap - * @param dn DatanodeDescriptor which holds the corrupt replica - * @param reason a textual reason (for logging purposes) - */ - public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, - String reason) { - addToCorruptReplicasMap(blk, dn, reason, Reason.NONE); - } /** * Mark the block belonging to datanode as corrupt. @@ -69,7 +57,7 @@ public class CorruptReplicasMap{ * @param reason a textual reason (for logging purposes) * @param reasonCode the enum representation of the reason */ - public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, + void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn, String reason, Reason reasonCode) { Map <DatanodeDescriptor, Reason> nodes = corruptReplicasMap.get(blk); if (nodes == null) { @@ -127,7 +115,6 @@ public class CorruptReplicasMap{ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode, Reason reason) { Map <DatanodeDescriptor, Reason> datanodes = corruptReplicasMap.get(blk); - boolean removed = false; if (datanodes==null) return false; @@ -174,12 +161,12 @@ public class CorruptReplicasMap{ return ((nodes != null) && (nodes.contains(node))); } - public int numCorruptReplicas(Block blk) { + int numCorruptReplicas(Block blk) { Collection<DatanodeDescriptor> nodes = getNodes(blk); return (nodes == null) ? 0 : nodes.size(); } - public int size() { + int size() { return corruptReplicasMap.size(); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Aug 20 01:34:29 2014 @@ -234,18 +234,6 @@ public class DatanodeDescriptor extends updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0); } - /** - * Add data-node to the block. Add block to the head of the list of blocks - * belonging to the data-node. - */ - public boolean addBlock(String storageID, BlockInfo b) { - DatanodeStorageInfo s = getStorageInfo(storageID); - if (s != null) { - return s.addBlock(b); - } - return false; - } - @VisibleForTesting public DatanodeStorageInfo getStorageInfo(String storageID) { synchronized (storageMap) { @@ -259,6 +247,15 @@ public class DatanodeDescriptor extends } } + public StorageReport[] getStorageReports() { + final DatanodeStorageInfo[] infos = getStorageInfos(); + final StorageReport[] reports = new StorageReport[infos.length]; + for(int i = 0; i < infos.length; i++) { + reports[i] = infos[i].toStorageReport(); + } + return reports; + } + boolean hasStaleStorages() { synchronized (storageMap) { for (DatanodeStorageInfo storage : storageMap.values()) { @@ -275,13 +272,10 @@ public class DatanodeDescriptor extends * data-node from the block. */ boolean removeBlock(BlockInfo b) { - int index = b.findStorageInfo(this); + final DatanodeStorageInfo s = b.findStorageInfo(this); // if block exists on this datanode - if (index >= 0) { - DatanodeStorageInfo s = b.getStorageInfo(index); - if (s != null) { - return s.removeBlock(b); - } + if (s != null) { + return s.removeBlock(b); } return false; } @@ -298,24 +292,6 @@ public class DatanodeDescriptor extends return false; } - /** - * Replace specified old block with a new one in the DataNodeDescriptor. - * - * @param oldBlock - block to be replaced - * @param newBlock - a replacement block - * @return the new block - */ - public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) { - int index = oldBlock.findStorageInfo(this); - DatanodeStorageInfo s = oldBlock.getStorageInfo(index); - boolean done = s.removeBlock(oldBlock); - assert done : "Old block should belong to the data-node when replacing"; - - done = s.addBlock(newBlock); - assert done : "New block should not belong to the data-node when replacing"; - return newBlock; - } - public void resetBlocks() { setCapacity(0); setRemaining(0); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Aug 20 01:34:29 2014 @@ -135,7 +135,10 @@ public class DatanodeManager { /** The number of stale DataNodes */ private volatile int numStaleNodes; - + + /** The number of stale storages */ + private volatile int numStaleStorages; + /** * Whether or not this cluster has ever consisted of more than 1 rack, * according to the NetworkTopology. @@ -1142,6 +1145,22 @@ public class DatanodeManager { return this.numStaleNodes; } + /** + * Get the number of content stale storages. + */ + public int getNumStaleStorages() { + return numStaleStorages; + } + + /** + * Set the number of content stale storages. + * + * @param numStaleStorages The number of content stale storages. + */ + void setNumStaleStorages(int numStaleStorages) { + this.numStaleStorages = numStaleStorages; + } + /** Fetch live and dead datanodes. */ public void fetchDatanodes(final List<DatanodeDescriptor> live, final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) { Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java Wed Aug 20 01:34:29 2014 @@ -207,13 +207,29 @@ public class DatanodeStorageInfo { return blockPoolUsed; } - boolean addBlock(BlockInfo b) { - if(!b.addStorage(this)) - return false; + public boolean addBlock(BlockInfo b) { + // First check whether the block belongs to a different storage + // on the same DN. + boolean replaced = false; + DatanodeStorageInfo otherStorage = + b.findStorageInfo(getDatanodeDescriptor()); + + if (otherStorage != null) { + if (otherStorage != this) { + // The block belongs to a different storage. Remove it first. + otherStorage.removeBlock(b); + replaced = true; + } else { + // The block is already associated with this storage. + return false; + } + } + // add to the head of the data-node list + b.addStorage(this); blockList = b.listInsert(blockList, this); numBlocks++; - return true; + return !replaced; } boolean removeBlock(BlockInfo b) { @@ -291,6 +307,12 @@ public class DatanodeStorageInfo { public String toString() { return "[" + storageType + "]" + storageID + ":" + state; } + + StorageReport toStorageReport() { + return new StorageReport( + new DatanodeStorage(storageID, state, storageType), + false, capacity, dfsUsed, remaining, blockPoolUsed); + } /** @return the first {@link DatanodeStorageInfo} corresponding to * the given datanode Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Wed Aug 20 01:34:29 2014 @@ -256,6 +256,7 @@ class HeartbeatManager implements Datano DatanodeID dead = null; // check the number of stale nodes int numOfStaleNodes = 0; + int numOfStaleStorages = 0; synchronized(this) { for (DatanodeDescriptor d : datanodes) { if (dead == null && dm.isDatanodeDead(d)) { @@ -265,10 +266,17 @@ class HeartbeatManager implements Datano if (d.isStale(dm.getStaleInterval())) { numOfStaleNodes++; } + DatanodeStorageInfo[] storageInfos = d.getStorageInfos(); + for(DatanodeStorageInfo storageInfo : storageInfos) { + if (storageInfo.areBlockContentsStale()) { + numOfStaleStorages++; + } + } } // Set the number of stale nodes in the DatanodeManager dm.setNumStaleNodes(numOfStaleNodes); + dm.setNumStaleStorages(numOfStaleStorages); } allAlive = dead == null; Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Wed Aug 20 01:34:29 2014 @@ -23,6 +23,7 @@ import java.util.Queue; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; + import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,14 +42,12 @@ class PendingDataNodeMessages { static class ReportedBlockInfo { private final Block block; - private final DatanodeDescriptor dn; - private final String storageID; + private final DatanodeStorageInfo storageInfo; private final ReplicaState reportedState; - ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block, + ReportedBlockInfo(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { - this.dn = dn; - this.storageID = storageID; + this.storageInfo = storageInfo; this.block = block; this.reportedState = reportedState; } @@ -57,21 +56,18 @@ class PendingDataNodeMessages { return block; } - DatanodeDescriptor getNode() { - return dn; - } - - String getStorageID() { - return storageID; - } - ReplicaState getReportedState() { return reportedState; } + + DatanodeStorageInfo getStorageInfo() { + return storageInfo; + } @Override public String toString() { - return "ReportedBlockInfo [block=" + block + ", dn=" + dn + return "ReportedBlockInfo [block=" + block + ", dn=" + + storageInfo.getDatanodeDescriptor() + ", reportedState=" + reportedState + "]"; } } @@ -87,7 +83,7 @@ class PendingDataNodeMessages { Queue<ReportedBlockInfo> oldQueue = entry.getValue(); while (!oldQueue.isEmpty()) { ReportedBlockInfo rbi = oldQueue.remove(); - if (!rbi.getNode().equals(dn)) { + if (!rbi.getStorageInfo().getDatanodeDescriptor().equals(dn)) { newQueue.add(rbi); } else { count--; @@ -97,11 +93,11 @@ class PendingDataNodeMessages { } } - void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block, + void enqueueReportedBlock(DatanodeStorageInfo storageInfo, Block block, ReplicaState reportedState) { block = new Block(block); getBlockQueue(block).add( - new ReportedBlockInfo(dn, storageID, block, reportedState)); + new ReportedBlockInfo(storageInfo, block, reportedState)); count++; } @@ -127,7 +123,7 @@ class PendingDataNodeMessages { return queue; } - public int count() { + int count() { return count ; } @@ -144,7 +140,7 @@ class PendingDataNodeMessages { return sb.toString(); } - public Iterable<ReportedBlockInfo> takeAll() { + Iterable<ReportedBlockInfo> takeAll() { List<ReportedBlockInfo> rbis = Lists.newArrayListWithCapacity( count); for (Queue<ReportedBlockInfo> q : queueByBlockId.values()) { Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Aug 20 01:34:29 2014 @@ -21,6 +21,7 @@ import com.google.common.annotations.Vis import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.collect.Sets; + import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -38,6 +39,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * One instance per block-pool/namespace on the DN, which handles the @@ -91,6 +94,28 @@ class BPOfferService { */ private long lastActiveClaimTxId = -1; + private final ReentrantReadWriteLock mReadWriteLock = + new ReentrantReadWriteLock(); + private final Lock mReadLock = mReadWriteLock.readLock(); + private final Lock mWriteLock = mReadWriteLock.writeLock(); + + // utility methods to acquire and release read lock and write lock + void readLock() { + mReadLock.lock(); + } + + void readUnlock() { + mReadLock.unlock(); + } + + void writeLock() { + mWriteLock.lock(); + } + + void writeUnlock() { + mWriteLock.unlock(); + } + BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) { Preconditions.checkArgument(!nnAddrs.isEmpty(), "Must pass at least one NN."); @@ -135,14 +160,19 @@ class BPOfferService { } return false; } - - synchronized String getBlockPoolId() { - if (bpNSInfo != null) { - return bpNSInfo.getBlockPoolID(); - } else { - LOG.warn("Block pool ID needed, but service not yet registered with NN", - new Exception("trace")); - return null; + + String getBlockPoolId() { + readLock(); + try { + if (bpNSInfo != null) { + return bpNSInfo.getBlockPoolID(); + } else { + LOG.warn("Block pool ID needed, but service not yet registered with NN", + new Exception("trace")); + return null; + } + } finally { + readUnlock(); } } @@ -150,27 +180,37 @@ class BPOfferService { return getNamespaceInfo() != null; } - synchronized NamespaceInfo getNamespaceInfo() { - return bpNSInfo; + NamespaceInfo getNamespaceInfo() { + readLock(); + try { + return bpNSInfo; + } finally { + readUnlock(); + } } @Override - public synchronized String toString() { - if (bpNSInfo == null) { - // If we haven't yet connected to our NN, we don't yet know our - // own block pool ID. - // If _none_ of the block pools have connected yet, we don't even - // know the DatanodeID ID of this DN. - String datanodeUuid = dn.getDatanodeUuid(); + public String toString() { + readLock(); + try { + if (bpNSInfo == null) { + // If we haven't yet connected to our NN, we don't yet know our + // own block pool ID. + // If _none_ of the block pools have connected yet, we don't even + // know the DatanodeID ID of this DN. + String datanodeUuid = dn.getDatanodeUuid(); - if (datanodeUuid == null || datanodeUuid.isEmpty()) { - datanodeUuid = "unassigned"; + if (datanodeUuid == null || datanodeUuid.isEmpty()) { + datanodeUuid = "unassigned"; + } + return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")"; + } else { + return "Block pool " + getBlockPoolId() + + " (Datanode Uuid " + dn.getDatanodeUuid() + + ")"; } - return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")"; - } else { - return "Block pool " + getBlockPoolId() + - " (Datanode Uuid " + dn.getDatanodeUuid() + - ")"; + } finally { + readUnlock(); } } @@ -266,32 +306,37 @@ class BPOfferService { * verifies that this namespace matches (eg to prevent a misconfiguration * where a StandbyNode from a different cluster is specified) */ - synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { - if (this.bpNSInfo == null) { - this.bpNSInfo = nsInfo; - boolean success = false; - - // Now that we know the namespace ID, etc, we can pass this to the DN. - // The DN can now initialize its local storage if we are the - // first BP to handshake, etc. - try { - dn.initBlockPool(this); - success = true; - } finally { - if (!success) { - // The datanode failed to initialize the BP. We need to reset - // the namespace info so that other BPService actors still have - // a chance to set it, and re-initialize the datanode. - this.bpNSInfo = null; + void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { + writeLock(); + try { + if (this.bpNSInfo == null) { + this.bpNSInfo = nsInfo; + boolean success = false; + + // Now that we know the namespace ID, etc, we can pass this to the DN. + // The DN can now initialize its local storage if we are the + // first BP to handshake, etc. + try { + dn.initBlockPool(this); + success = true; + } finally { + if (!success) { + // The datanode failed to initialize the BP. We need to reset + // the namespace info so that other BPService actors still have + // a chance to set it, and re-initialize the datanode. + this.bpNSInfo = null; + } } + } else { + checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), + "Blockpool ID"); + checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), + "Namespace ID"); + checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), + "Cluster ID"); } - } else { - checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(), - "Blockpool ID"); - checkNSEquality(bpNSInfo.getNamespaceID(), nsInfo.getNamespaceID(), - "Namespace ID"); - checkNSEquality(bpNSInfo.getClusterID(), nsInfo.getClusterID(), - "Cluster ID"); + } finally { + writeUnlock(); } } @@ -300,22 +345,27 @@ class BPOfferService { * NN, it calls this function to verify that the NN it connected to * is consistent with other NNs serving the block-pool. */ - synchronized void registrationSucceeded(BPServiceActor bpServiceActor, + void registrationSucceeded(BPServiceActor bpServiceActor, DatanodeRegistration reg) throws IOException { - if (bpRegistration != null) { - checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), - reg.getStorageInfo().getNamespaceID(), "namespace ID"); - checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), - reg.getStorageInfo().getClusterID(), "cluster ID"); - } else { - bpRegistration = reg; - } - - dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); - // Add the initial block token secret keys to the DN's secret manager. - if (dn.isBlockTokenEnabled) { - dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), - reg.getExportedKeys()); + writeLock(); + try { + if (bpRegistration != null) { + checkNSEquality(bpRegistration.getStorageInfo().getNamespaceID(), + reg.getStorageInfo().getNamespaceID(), "namespace ID"); + checkNSEquality(bpRegistration.getStorageInfo().getClusterID(), + reg.getStorageInfo().getClusterID(), "cluster ID"); + } else { + bpRegistration = reg; + } + + dn.bpRegistrationSucceeded(bpRegistration, getBlockPoolId()); + // Add the initial block token secret keys to the DN's secret manager. + if (dn.isBlockTokenEnabled) { + dn.blockPoolTokenSecretManager.addKeys(getBlockPoolId(), + reg.getExportedKeys()); + } + } finally { + writeUnlock(); } } @@ -333,25 +383,35 @@ class BPOfferService { } } - synchronized DatanodeRegistration createRegistration() { - Preconditions.checkState(bpNSInfo != null, - "getRegistration() can only be called after initial handshake"); - return dn.createBPRegistration(bpNSInfo); + DatanodeRegistration createRegistration() { + writeLock(); + try { + Preconditions.checkState(bpNSInfo != null, + "getRegistration() can only be called after initial handshake"); + return dn.createBPRegistration(bpNSInfo); + } finally { + writeUnlock(); + } } /** * Called when an actor shuts down. If this is the last actor * to shut down, shuts down the whole blockpool in the DN. */ - synchronized void shutdownActor(BPServiceActor actor) { - if (bpServiceToActive == actor) { - bpServiceToActive = null; - } + void shutdownActor(BPServiceActor actor) { + writeLock(); + try { + if (bpServiceToActive == actor) { + bpServiceToActive = null; + } - bpServices.remove(actor); + bpServices.remove(actor); - if (bpServices.isEmpty()) { - dn.shutdownBlockPool(this); + if (bpServices.isEmpty()) { + dn.shutdownBlockPool(this); + } + } finally { + writeUnlock(); } } @@ -392,11 +452,16 @@ class BPOfferService { * @return a proxy to the active NN, or null if the BPOS has not * acknowledged any NN as active yet. */ - synchronized DatanodeProtocolClientSideTranslatorPB getActiveNN() { - if (bpServiceToActive != null) { - return bpServiceToActive.bpNamenode; - } else { - return null; + DatanodeProtocolClientSideTranslatorPB getActiveNN() { + readLock(); + try { + if (bpServiceToActive != null) { + return bpServiceToActive.bpNamenode; + } else { + return null; + } + } finally { + readUnlock(); } } @@ -424,45 +489,50 @@ class BPOfferService { * @param actor the actor which received the heartbeat * @param nnHaState the HA-related heartbeat contents */ - synchronized void updateActorStatesFromHeartbeat( + void updateActorStatesFromHeartbeat( BPServiceActor actor, NNHAStatusHeartbeat nnHaState) { - final long txid = nnHaState.getTxId(); - - final boolean nnClaimsActive = - nnHaState.getState() == HAServiceState.ACTIVE; - final boolean bposThinksActive = bpServiceToActive == actor; - final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; - - if (nnClaimsActive && !bposThinksActive) { - LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + - "txid=" + txid); - if (!isMoreRecentClaim) { - // Split-brain scenario - an NN is trying to claim active - // state when a different NN has already claimed it with a higher - // txid. - LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + - txid + " but there was already a more recent claim at txid=" + - lastActiveClaimTxId); - return; - } else { - if (bpServiceToActive == null) { - LOG.info("Acknowledging ACTIVE Namenode " + actor); + writeLock(); + try { + final long txid = nnHaState.getTxId(); + + final boolean nnClaimsActive = + nnHaState.getState() == HAServiceState.ACTIVE; + final boolean bposThinksActive = bpServiceToActive == actor; + final boolean isMoreRecentClaim = txid > lastActiveClaimTxId; + + if (nnClaimsActive && !bposThinksActive) { + LOG.info("Namenode " + actor + " trying to claim ACTIVE state with " + + "txid=" + txid); + if (!isMoreRecentClaim) { + // Split-brain scenario - an NN is trying to claim active + // state when a different NN has already claimed it with a higher + // txid. + LOG.warn("NN " + actor + " tried to claim ACTIVE state at txid=" + + txid + " but there was already a more recent claim at txid=" + + lastActiveClaimTxId); + return; } else { - LOG.info("Namenode " + actor + " taking over ACTIVE state from " + - bpServiceToActive + " at higher txid=" + txid); + if (bpServiceToActive == null) { + LOG.info("Acknowledging ACTIVE Namenode " + actor); + } else { + LOG.info("Namenode " + actor + " taking over ACTIVE state from " + + bpServiceToActive + " at higher txid=" + txid); + } + bpServiceToActive = actor; } - bpServiceToActive = actor; + } else if (!nnClaimsActive && bposThinksActive) { + LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + + "txid=" + nnHaState.getTxId()); + bpServiceToActive = null; } - } else if (!nnClaimsActive && bposThinksActive) { - LOG.info("Namenode " + actor + " relinquishing ACTIVE state with " + - "txid=" + nnHaState.getTxId()); - bpServiceToActive = null; - } - - if (bpServiceToActive == actor) { - assert txid >= lastActiveClaimTxId; - lastActiveClaimTxId = txid; + + if (bpServiceToActive == actor) { + assert txid >= lastActiveClaimTxId; + lastActiveClaimTxId = txid; + } + } finally { + writeUnlock(); } } @@ -531,14 +601,17 @@ class BPOfferService { LOG.info("DatanodeCommand action : DNA_REGISTER from " + actor.nnAddr + " with " + actor.state + " state"); actor.reRegister(); - return true; + return false; } - synchronized (this) { + writeLock(); + try { if (actor == bpServiceToActive) { return processCommandFromActive(cmd, actor); } else { return processCommandFromStandby(cmd, actor); } + } finally { + writeUnlock(); } } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Wed Aug 20 01:34:29 2014 @@ -222,7 +222,19 @@ class BPServiceActor implements Runnable // Second phase of the handshake with the NN. register(); } - + + // This is useful to make sure NN gets Heartbeat before Blockreport + // upon NN restart while DN keeps retrying Otherwise, + // 1. NN restarts. + // 2. Heartbeat RPC will retry and succeed. NN asks DN to reregister. + // 3. After reregistration completes, DN will send Blockreport first. + // 4. Given NN receives Blockreport after Heartbeat, it won't mark + // DatanodeStorageInfo#blockContentsStale to false until the next + // Blockreport. + void scheduleHeartbeat() { + lastHeartbeat = 0; + } + /** * This methods arranges for the data node to send the block report at * the next heartbeat. @@ -902,6 +914,7 @@ class BPServiceActor implements Runnable retrieveNamespaceInfo(); // and re-register register(); + scheduleHeartbeat(); } } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Wed Aug 20 01:34:29 2014 @@ -36,8 +36,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.Properties; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -106,13 +108,22 @@ public class BlockPoolSliceStorage exten void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt) throws IOException { LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); + Set<String> existingStorageDirs = new HashSet<String>(); + for (int i = 0; i < getNumStorageDirs(); i++) { + existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); + } + // 1. For each BP data directory analyze the state and // check whether all is consistent before transitioning. - this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>( dataDirs.size()); for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next(); + if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + LOG.info("Storage directory " + dataDir + " has already been used."); + it.remove(); + continue; + } StorageDirectory sd = new StorageDirectory(dataDir, null, true); StorageState curState; try { @@ -152,7 +163,7 @@ public class BlockPoolSliceStorage exten // During startup some of them can upgrade or roll back // while others could be up-to-date for the regular startup. for (int idx = 0; idx < getNumStorageDirs(); idx++) { - doTransition(getStorageDir(idx), nsInfo, startOpt); + doTransition(datanode, getStorageDir(idx), nsInfo, startOpt); assert getCTime() == nsInfo.getCTime() : "Data-node and name-node CTimes must be the same."; } @@ -242,7 +253,7 @@ public class BlockPoolSliceStorage exten * @param startOpt startup option * @throws IOException */ - private void doTransition(StorageDirectory sd, + private void doTransition(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { if (startOpt == StartupOption.ROLLBACK) { doRollback(sd, nsInfo); // rollback if applicable @@ -275,7 +286,7 @@ public class BlockPoolSliceStorage exten } if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION || this.cTime < nsInfo.getCTime()) { - doUpgrade(sd, nsInfo); // upgrade + doUpgrade(datanode, sd, nsInfo); // upgrade return; } // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime @@ -304,7 +315,8 @@ public class BlockPoolSliceStorage exten * @param nsInfo Namespace Info from the namenode * @throws IOException on error */ - void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException { + void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo) + throws IOException { // Upgrading is applicable only to release with federation or after if (!DataNodeLayoutVersion.supports( LayoutVersion.Feature.FEDERATION, layoutVersion)) { @@ -312,7 +324,7 @@ public class BlockPoolSliceStorage exten } LOG.info("Upgrading block pool storage directory " + bpSd.getRoot() + ".\n old LV = " + this.getLayoutVersion() + "; old CTime = " - + this.getCTime() + ".\n new LV = " + nsInfo.getLayoutVersion() + + this.getCTime() + ".\n new LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION + "; new CTime = " + nsInfo.getCTime()); // get <SD>/previous directory String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath()); @@ -340,7 +352,7 @@ public class BlockPoolSliceStorage exten rename(bpCurDir, bpTmpDir); // 3. Create new <SD>/current with block files hardlinks and VERSION - linkAllBlocks(bpTmpDir, bpCurDir); + linkAllBlocks(datanode, bpTmpDir, bpCurDir); this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; assert this.namespaceID == nsInfo.getNamespaceID() : "Data-node and name-node layout versions must be the same."; @@ -517,14 +529,15 @@ public class BlockPoolSliceStorage exten * @param toDir the current data directory * @throws IOException if error occurs during hardlink */ - private void linkAllBlocks(File fromDir, File toDir) throws IOException { + private void linkAllBlocks(DataNode datanode, File fromDir, File toDir) + throws IOException { // do the link int diskLayoutVersion = this.getLayoutVersion(); // hardlink finalized blocks in tmpDir HardLink hardLink = new HardLink(); - DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), + DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); - DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_RBW), + DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW), new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink); LOG.info( hardLink.linkStats.report() ); } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Aug 20 01:34:29 2014 @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.d import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -123,6 +124,14 @@ class BlockReceiver implements Closeable private boolean syncOnClose; private long restartBudget; + /** + * for replaceBlock response + */ + private final long responseInterval; + private long lastResponseTime = 0; + private boolean isReplaceBlock = false; + private DataOutputStream replyOut = null; + BlockReceiver(final ExtendedBlock block, final StorageType storageType, final DataInputStream in, final String inAddr, final String myAddr, @@ -144,6 +153,9 @@ class BlockReceiver implements Closeable this.isClient = !this.isDatanode; this.restartBudget = datanode.getDnConf().restartReplicaExpiry; this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs; + // For replaceBlock() calls response should be sent to avoid socketTimeout + // at clients. So sending with the interval of 0.5 * socketTimeout + this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5); //for datanode, we have //1: clientName.length() == 0, and //2: stage == null or PIPELINE_SETUP_CREATE @@ -253,7 +265,7 @@ class BlockReceiver implements Closeable if (cause != null) { // possible disk error ioe = cause; - datanode.checkDiskError(); + datanode.checkDiskErrorAsync(); } throw ioe; @@ -329,7 +341,7 @@ class BlockReceiver implements Closeable } // disk check if(ioe != null) { - datanode.checkDiskError(); + datanode.checkDiskErrorAsync(); throw ioe; } } @@ -639,7 +651,7 @@ class BlockReceiver implements Closeable manageWriterOsCache(offsetInBlock); } } catch (IOException iex) { - datanode.checkDiskError(); + datanode.checkDiskErrorAsync(); throw iex; } } @@ -651,6 +663,20 @@ class BlockReceiver implements Closeable lastPacketInBlock, offsetInBlock, Status.SUCCESS); } + /* + * Send in-progress responses for the replaceBlock() calls back to caller to + * avoid timeouts due to balancer throttling. HDFS-6247 + */ + if (isReplaceBlock + && (Time.monotonicNow() - lastResponseTime > responseInterval)) { + BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() + .setStatus(Status.IN_PROGRESS); + response.build().writeDelimitedTo(replyOut); + replyOut.flush(); + + lastResponseTime = Time.monotonicNow(); + } + if (throttler != null) { // throttle I/O throttler.throttle(len); } @@ -712,13 +738,19 @@ class BlockReceiver implements Closeable LOG.warn("Error managing cache for writer of block " + block, t); } } - + + public void sendOOB() throws IOException, InterruptedException { + ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck + .getRestartOOBStatus()); + } + void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, - DatanodeInfo[] downstreams) throws IOException { + DatanodeInfo[] downstreams, + boolean isReplaceBlock) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; boolean responderClosed = false; @@ -726,6 +758,9 @@ class BlockReceiver implements Closeable mirrorAddr = mirrAddr; throttler = throttlerArg; + this.replyOut = replyOut; + this.isReplaceBlock = isReplaceBlock; + try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, @@ -800,9 +835,7 @@ class BlockReceiver implements Closeable // The worst case is not recovering this RBW replica. // Client will fall back to regular pipeline recovery. } - try { - ((PacketResponder) responder.getRunnable()). - sendOOBResponse(PipelineAck.getRestartOOBStatus()); + try { // Even if the connection is closed after the ack packet is // flushed, the client can react to the connection closure // first. Insert a delay to lower the chance of client @@ -810,8 +843,6 @@ class BlockReceiver implements Closeable Thread.sleep(1000); } catch (InterruptedException ie) { // It is already going down. Ignore this. - } catch (IOException ioe) { - LOG.info("Error sending OOB Ack.", ioe); } } responder.interrupt(); @@ -1208,7 +1239,7 @@ class BlockReceiver implements Closeable } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { - datanode.checkDiskError(); + datanode.checkDiskErrorAsync(); LOG.info(myString, e); running = false; if (!Thread.interrupted()) { // failure not caused by interruption Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Aug 20 01:34:29 2014 @@ -687,7 +687,7 @@ class BlockSender implements java.io.Clo // Trigger readahead of beginning of file if configured. manageOsCache(); - final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; + final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0; try { int maxChunksPerPacket; int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; @@ -733,9 +733,9 @@ class BlockSender implements java.io.Clo sentEntireByteRange = true; } } finally { - if (clientTraceFmt != null) { + if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) { final long endTime = System.nanoTime(); - ClientTraceLog.info(String.format(clientTraceFmt, totalRead, + ClientTraceLog.debug(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); } close(); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Aug 20 01:34:29 2014 @@ -270,6 +270,7 @@ public class DataNode extends Configured public final static String EMPTY_DEL_HINT = ""; final AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; + DataXceiverServer xserver = null; Daemon localDataXceiverServer = null; ShortCircuitRegistry shortCircuitRegistry = null; ThreadGroup threadGroup = null; @@ -649,8 +650,8 @@ public class DataNode extends Configured streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); - this.dataXceiverServer = new Daemon(threadGroup, - new DataXceiverServer(tcpPeerServer, conf, this)); + xserver = new DataXceiverServer(tcpPeerServer, conf, this); + this.dataXceiverServer = new Daemon(threadGroup, xserver); this.threadGroup.setDaemon(true); // auto destroy when empty if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, @@ -1075,6 +1076,11 @@ public class DataNode extends Configured // In the case that this is the first block pool to connect, initialize // the dataset, block scanners, etc. initStorage(nsInfo); + + // Exclude failed disks before initializing the block pools to avoid startup + // failures. + checkDiskError(); + initPeriodicScanners(conf); data.addBlockPool(nsInfo.getBlockPoolID(), conf); @@ -1133,6 +1139,11 @@ public class DataNode extends Configured } @VisibleForTesting + public DataXceiverServer getXferServer() { + return xserver; + } + + @VisibleForTesting public int getXferPort() { return streamingAddr.getPort(); } @@ -1390,6 +1401,7 @@ public class DataNode extends Configured // in order to avoid any further acceptance of requests, but the peers // for block writes are not closed until the clients are notified. if (dataXceiverServer != null) { + xserver.sendOOBToPeers(); ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill(); this.dataXceiverServer.interrupt(); } @@ -1510,9 +1522,9 @@ public class DataNode extends Configured /** - * Check if there is a disk failure and if so, handle the error + * Check if there is a disk failure asynchronously and if so, handle the error */ - public void checkDiskError() { + public void checkDiskErrorAsync() { synchronized(checkDiskErrorMutex) { checkDiskErrorFlag = true; if(checkDiskErrorThread == null) { @@ -1821,7 +1833,7 @@ public class DataNode extends Configured LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0] + " got ", ie); // check if there are any disk problem - checkDiskError(); + checkDiskErrorAsync(); } finally { xmitsInProgress.getAndDecrement(); IOUtils.closeStream(blockSender); @@ -2759,7 +2771,18 @@ public class DataNode extends Configured public ShortCircuitRegistry getShortCircuitRegistry() { return shortCircuitRegistry; } - + + /** + * Check the disk error + */ + private void checkDiskError() { + try { + data.checkDataDir(); + } catch (DiskErrorException de) { + handleDiskError(de.getMessage()); + } + } + /** * Starts a new thread which will check for disk error check request * every 5 sec @@ -2776,9 +2799,7 @@ public class DataNode extends Configured } if(tempFlag) { try { - data.checkDataDir(); - } catch (DiskErrorException de) { - handleDiskError(de.getMessage()); + checkDiskError(); } catch (Exception e) { LOG.warn("Unexpected exception occurred while checking disk error " + e); checkDiskErrorThread = null; Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java Wed Aug 20 01:34:29 2014 @@ -62,7 +62,10 @@ public class DataNodeLayoutVersion { * </ul> */ public static enum Feature implements LayoutFeature { - FIRST_LAYOUT(-55, -53, "First datanode layout", false); + FIRST_LAYOUT(-55, -53, "First datanode layout", false), + BLOCKID_BASED_LAYOUT(-56, + "The block ID of a finalized block uniquely determines its position " + + "in the directory structure"); private final FeatureInfo info;