http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 6c6d758,1346ab3..8232ab9 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@@ -674,8 -648,8 +674,8 @@@ public class BlockManager implements Bl return false; // already completed (e.g. by syncBlock) final boolean b = commitBlock(lastBlock, commitBlock); - if (hasMinStorage(lastBlock)) { - completeBlock(bc, bc.numBlocks() - 1, false); - if (countNodes(lastBlock).liveReplicas() >= minReplication) { ++ if (hasMinStorage(lastBlock)) { + completeBlock(lastBlock, false); } return b; } @@@ -698,9 -666,9 +692,9 @@@ } int numNodes = curBlock.numNodes(); - if (!force && numNodes < minReplication) { + if (!force && !hasMinStorage(curBlock, numNodes)) { - throw new IOException("Cannot complete block: " + - "block does not satisfy minimal replication requirement."); + throw new IOException("Cannot complete block: " + + "block does not satisfy minimal replication requirement."); } if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) { throw new IOException( @@@ -718,26 -683,10 +709,12 @@@ // a "forced" completion when a file is getting closed by an // OP_CLOSE edit on the standby). namesystem.adjustSafeModeBlockTotals(0, 1); + final int minStorage = curBlock.isStriped() ? + ((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication; namesystem.incrementSafeBlockCount( - Math.min(numNodes, minReplication)); + Math.min(numNodes, minStorage), curBlock); - - // replace block in the blocksMap - return blocksMap.replaceBlock(completeBlock); } - private BlockInfo completeBlock(final BlockCollection bc, - final BlockInfo block, boolean force) throws IOException { - BlockInfo[] fileBlocks = bc.getBlocks(); - for (int idx = 0; idx < fileBlocks.length; idx++) { - if (fileBlocks[idx] == block) { - return completeBlock(bc, idx, force); - } - } - return block; - } - /** * Force the given block in the given file to be marked as complete, * regardless of whether enough replicas are present. This is necessary @@@ -1270,37 -1162,29 +1245,36 @@@ private void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException { -- - if (b.stored.isDeleted()) { - if (b.getCorrupted().isDeleted()) { ++ if (b.getStored().isDeleted()) { blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" + " corrupt as it does not belong to any file", b); - addToInvalidates(b.corrupted, node); + addToInvalidates(b.getCorrupted(), node); return; - } - short expectedReplicas = b.getCorrupted().getReplication(); + } + short expectedReplicas = - getExpectedReplicaNum(b.stored); ++ getExpectedReplicaNum(b.getStored()); // Add replica to the data-node if it is not already there if (storageInfo != null) { - storageInfo.addBlock(b.stored, b.corrupted); - storageInfo.addBlock(b.getStored()); ++ storageInfo.addBlock(b.getStored(), b.getCorrupted()); } - // Add this replica to corruptReplicas Map - corruptReplicas.addToCorruptReplicasMap(b.getCorrupted(), node, - b.getReason(), b.getReasonCode()); + // Add this replica to corruptReplicas Map. For striped blocks, we always + // use the id of whole striped block group when adding to corruptReplicas - Block corrupted = new Block(b.corrupted); - if (b.stored.isStriped()) { - corrupted.setBlockId(b.stored.getBlockId()); ++ Block corrupted = new Block(b.getCorrupted()); ++ if (b.getStored().isStriped()) { ++ corrupted.setBlockId(b.getStored().getBlockId()); + } - corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.reason, - b.reasonCode); ++ corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.getReason(), ++ b.getReasonCode()); - NumberReplicas numberOfReplicas = countNodes(b.stored); + NumberReplicas numberOfReplicas = countNodes(b.getStored()); boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas; - boolean minReplicationSatisfied = - numberOfReplicas.liveReplicas() >= minReplication; + - boolean minReplicationSatisfied = hasMinStorage(b.stored, ++ boolean minReplicationSatisfied = hasMinStorage(b.getStored(), + numberOfReplicas.liveReplicas()); + boolean hasMoreCorruptReplicas = minReplicationSatisfied && (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedReplicas; @@@ -1315,10 -1199,10 +1289,10 @@@ if (hasEnoughLiveReplicas || hasMoreCorruptReplicas || corruptedDuringWrite) { // the block is over-replicated so invalidate the replicas immediately - invalidateBlock(b, node); + invalidateBlock(b, node, numberOfReplicas); } else if (namesystem.isPopulatingReplQueues()) { // add the block to neededReplication - updateNeededReplications(b.stored, -1, 0); + updateNeededReplications(b.getStored(), -1, 0); } } @@@ -1342,13 -1227,12 +1316,13 @@@ "invalidation of {} on {} because {} replica(s) are located on " + "nodes with potentially out-of-date block reports", b, dn, nr.replicasOnStaleNodes()); - postponeBlock(b.corrupted); + postponeBlock(b.getCorrupted()); return false; - } else if (nr.liveReplicas() >= 1) { - // If we have at least one copy on a live node, then we can delete it. + } else { + // we already checked the number of replicas in the caller of this + // function and know there are enough live replicas, so we can delete it. - addToInvalidates(b.corrupted, dn); - removeStoredBlock(b.stored, node); + addToInvalidates(b.getCorrupted(), dn); + removeStoredBlock(b.getStored(), node); blockLog.debug("BLOCK* invalidateBlocks: {} on {} listed for deletion.", b, dn); return true; @@@ -1446,72 -1326,11 +1420,11 @@@ namesystem.writeLock(); try { synchronized (neededReplications) { - for (int priority = 0; priority < blocksToReplicate.size(); priority++) { - for (BlockInfo block : blocksToReplicate.get(priority)) { - ReplicationWork rw = scheduleReplication(block, priority); + for (int priority = 0; priority < blocksToRecover.size(); priority++) { + for (BlockInfo block : blocksToRecover.get(priority)) { - // block should belong to a file - bc = getBlockCollection(block); - // abandoned block or block reopened for append - if (bc == null - || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { - // remove from neededReplications - neededReplications.remove(block, priority); - continue; - } - - requiredReplication = getExpectedReplicaNum(block); - - // get a source data-node - containingNodes = new ArrayList<>(); - List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>(); - NumberReplicas numReplicas = new NumberReplicas(); - List<Short> liveBlockIndices = new ArrayList<>(); - final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, - containingNodes, liveReplicaNodes, numReplicas, - liveBlockIndices, priority); - if(srcNodes == null || srcNodes.length == 0) { - // block can not be replicated from any node - LOG.debug("Block " + block + " cannot be recovered " + - "from any node"); - continue; - } - - // liveReplicaNodes can include READ_ONLY_SHARED replicas which are - // not included in the numReplicas.liveReplicas() count - assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); - - // do not schedule more if enough replicas is already pending - numEffectiveReplicas = numReplicas.liveReplicas() + - pendingReplications.getNumReplicas(block); - - if (numEffectiveReplicas >= requiredReplication) { - if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block, requiredReplication)) ) { - neededReplications.remove(block, priority); // remove from neededReplications - blockLog.debug("BLOCK* Removing {} from neededReplications as" + - " it has enough replicas", block); - continue; - } - } - - if (numReplicas.liveReplicas() < requiredReplication) { - additionalReplRequired = requiredReplication - - numEffectiveReplicas; - } else { - additionalReplRequired = 1; // Needed on a new rack - } - if (block.isStriped()) { - short[] indices = new short[liveBlockIndices.size()]; - for (int i = 0 ; i < liveBlockIndices.size(); i++) { - indices[i] = liveBlockIndices.get(i); - } - ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes, - containingNodes, liveReplicaNodes, additionalReplRequired, - priority, indices); - recovWork.add(ecw); - } else { - recovWork.add(new ReplicationWork(block, bc, srcNodes, - containingNodes, liveReplicaNodes, additionalReplRequired, - priority)); ++ BlockRecoveryWork rw = scheduleRecovery(block, priority); + if (rw != null) { - work.add(rw); ++ recovWork.add(rw); } } } @@@ -1520,9 -1339,8 +1433,9 @@@ namesystem.writeUnlock(); } + // Step 2: choose target nodes for each recovery task - final Set<Node> excludedNodes = new HashSet<Node>(); + final Set<Node> excludedNodes = new HashSet<>(); - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear(); @@@ -1533,101 -1351,21 +1446,24 @@@ // choose replication targets: NOT HOLDING THE GLOBAL LOCK // It is costly to extract the filename for which chooseTargets is called, // so for now we pass in the block collection itself. - rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes); + final BlockPlacementPolicy placementPolicy = - placementPolicies.getPolicy(rw.block.isStriped()); ++ placementPolicies.getPolicy(rw.getBlock().isStriped()); + rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes); } + // Step 3: add tasks to the DN namesystem.writeLock(); try { - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ - final DatanodeStorageInfo[] targets = rw.targets; + final DatanodeStorageInfo[] targets = rw.getTargets(); if(targets == null || targets.length == 0){ - rw.targets = null; + rw.resetTargets(); continue; } synchronized (neededReplications) { - BlockInfo block = rw.block; - int priority = rw.priority; - // Recheck since global lock was released - // block should belong to a file - bc = getBlockCollection(block); - // abandoned block or block reopened for append - if(bc == null || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { - neededReplications.remove(block, priority); // remove from neededReplications - rw.targets = null; - continue; - } - requiredReplication = getExpectedReplicaNum(block); - - // do not schedule more if enough replicas is already pending - NumberReplicas numReplicas = countNodes(block); - numEffectiveReplicas = numReplicas.liveReplicas() + - pendingReplications.getNumReplicas(block); - - if (numEffectiveReplicas >= requiredReplication) { - if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block, requiredReplication)) ) { - neededReplications.remove(block, priority); // remove from neededReplications - rw.targets = null; - blockLog.debug("BLOCK* Removing {} from neededReplications as" + - " it has enough replicas", block); - continue; - } - } - - if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block, requiredReplication)) ) { - if (rw.srcNodes[0].getNetworkLocation().equals( - targets[0].getDatanodeDescriptor().getNetworkLocation())) { - //No use continuing, unless a new rack in this case - continue; - } - } - - // Add block to the to be replicated list - if (block.isStriped()) { - assert rw instanceof ErasureCodingWork; - assert rw.targets.length > 0; - String src = getBlockCollection(block).getName(); - ErasureCodingZone ecZone = null; - try { - ecZone = namesystem.getErasureCodingZoneForPath(src); - } catch (IOException e) { - blockLog - .warn("Failed to get the EC zone for the file {} ", src); - } - if (ecZone == null) { - blockLog.warn("No erasure coding policy found for the file {}. " - + "So cannot proceed for recovery", src); - // TODO: we may have to revisit later for what we can do better to - // handle this case. - continue; - } - rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( - new ExtendedBlock(namesystem.getBlockPoolId(), block), - rw.srcNodes, rw.targets, - ((ErasureCodingWork) rw).liveBlockIndicies, - ecZone.getErasureCodingPolicy()); - } else { - rw.srcNodes[0].addBlockToBeReplicated(block, targets); - } - scheduledWork++; - DatanodeStorageInfo.incrementBlocksScheduled(targets); - - // Move the block-replication into a "pending" state. - // The reason we use 'pending' is so we can retry - // replications that fail after an appropriate amount of time. - pendingReplications.increment(block, - DatanodeStorageInfo.toDatanodeDescriptors(targets)); - blockLog.debug("BLOCK* block {} is moved from neededReplications to " - + "pendingReplications", block); - - // remove from neededReplications - if(numEffectiveReplicas + targets.length >= requiredReplication) { - neededReplications.remove(block, priority); // remove from neededReplications - if (validateReplicationWork(rw)) { ++ if (validateRecoveryWork(rw)) { + scheduledWork++; } } } @@@ -1637,16 -1375,16 +1473,16 @@@ if (blockLog.isInfoEnabled()) { // log which blocks have been scheduled for replication - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ - DatanodeStorageInfo[] targets = rw.targets; + DatanodeStorageInfo[] targets = rw.getTargets(); if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); for (DatanodeStorageInfo target : targets) { targetList.append(' '); targetList.append(target.getDatanodeDescriptor()); } - blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNodes, - rw.block, targetList); - blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNode(), ++ blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.getSrcNodes(), + rw.getBlock(), targetList); } } } @@@ -1658,6 -1396,118 +1494,160 @@@ return scheduledWork; } + boolean hasEnoughEffectiveReplicas(BlockInfo block, + NumberReplicas numReplicas, int pendingReplicaNum, int required) { + int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum; + return (numEffectiveReplicas >= required) && - (pendingReplicaNum > 0 || blockHasEnoughRacks(block)); ++ (pendingReplicaNum > 0 || blockHasEnoughRacks(block, required)); + } + - private ReplicationWork scheduleReplication(BlockInfo block, int priority) { ++ private BlockRecoveryWork scheduleRecovery(BlockInfo block, int priority) { + // block should belong to a file + BlockCollection bc = getBlockCollection(block); + // abandoned block or block reopened for append + if (bc == null + || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { + // remove from neededReplications + neededReplications.remove(block, priority); + return null; + } + + short requiredReplication = getExpectedReplicaNum(block); + + // get a source data-node + List<DatanodeDescriptor> containingNodes = new ArrayList<>(); + List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>(); + NumberReplicas numReplicas = new NumberReplicas(); - DatanodeDescriptor srcNode = chooseSourceDatanode(block, containingNodes, - liveReplicaNodes, numReplicas, priority); - if (srcNode == null) { // block can not be replicated from any node - LOG.debug("Block " + block + " cannot be repl from any node"); ++ List<Short> liveBlockIndices = new ArrayList<>(); ++ final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, ++ containingNodes, liveReplicaNodes, numReplicas, ++ liveBlockIndices, priority); ++ if(srcNodes == null || srcNodes.length == 0) { ++ // block can not be recovered from any node ++ LOG.debug("Block " + block + " cannot be recovered " + ++ "from any node"); + return null; + } + + // liveReplicaNodes can include READ_ONLY_SHARED replicas which are + // not included in the numReplicas.liveReplicas() count + assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); + + int pendingNum = pendingReplications.getNumReplicas(block); + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, + requiredReplication)) { + neededReplications.remove(block, priority); + blockLog.debug("BLOCK* Removing {} from neededReplications as" + + " it has enough replicas", block); + return null; + } + + final int additionalReplRequired; + if (numReplicas.liveReplicas() < requiredReplication) { + additionalReplRequired = requiredReplication - numReplicas.liveReplicas() + - pendingNum; + } else { + additionalReplRequired = 1; // Needed on a new rack + } - return new ReplicationWork(block, bc, srcNode, containingNodes, - liveReplicaNodes, additionalReplRequired, priority); ++ ++ if (block.isStriped()) { ++ short[] indices = new short[liveBlockIndices.size()]; ++ for (int i = 0 ; i < liveBlockIndices.size(); i++) { ++ indices[i] = liveBlockIndices.get(i); ++ } ++ return new ErasureCodingWork(block, bc, srcNodes, ++ containingNodes, liveReplicaNodes, additionalReplRequired, ++ priority, indices); ++ } else { ++ return new ReplicationWork(block, bc, srcNodes, ++ containingNodes, liveReplicaNodes, additionalReplRequired, ++ priority); ++ } + } + - private boolean validateReplicationWork(ReplicationWork rw) { ++ private boolean validateRecoveryWork(BlockRecoveryWork rw) { + BlockInfo block = rw.getBlock(); + int priority = rw.getPriority(); + // Recheck since global lock was released + // block should belong to a file + BlockCollection bc = getBlockCollection(block); + // abandoned block or block reopened for append + if (bc == null + || (bc.isUnderConstruction() && block.equals(bc.getLastBlock()))) { + neededReplications.remove(block, priority); + rw.resetTargets(); + return false; + } + + // do not schedule more if enough replicas is already pending + final short requiredReplication = getExpectedReplicaNum(block); + NumberReplicas numReplicas = countNodes(block); + final int pendingNum = pendingReplications.getNumReplicas(block); + if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, + requiredReplication)) { + neededReplications.remove(block, priority); + rw.resetTargets(); + blockLog.debug("BLOCK* Removing {} from neededReplications as" + + " it has enough replicas", block); + return false; + } + + DatanodeStorageInfo[] targets = rw.getTargets(); + if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { - if (rw.getSrcNode().getNetworkLocation().equals( ++ (!blockHasEnoughRacks(block, requiredReplication)) ) { ++ if (rw.getSrcNodes()[0].getNetworkLocation().equals( + targets[0].getDatanodeDescriptor().getNetworkLocation())) { + //No use continuing, unless a new rack in this case + return false; + } + } + - // Add block to the to be replicated list - rw.getSrcNode().addBlockToBeReplicated(block, targets); ++ // Add block to the to be recovered list ++ if (block.isStriped()) { ++ assert rw instanceof ErasureCodingWork; ++ assert rw.getTargets().length > 0; ++ String src = getBlockCollection(block).getName(); ++ ErasureCodingZone ecZone = null; ++ try { ++ ecZone = namesystem.getErasureCodingZoneForPath(src); ++ } catch (IOException e) { ++ blockLog ++ .warn("Failed to get the EC zone for the file {} ", src); ++ } ++ if (ecZone == null) { ++ blockLog.warn("No erasure coding policy found for the file {}. " ++ + "So cannot proceed for recovery", src); ++ // TODO: we may have to revisit later for what we can do better to ++ // handle this case. ++ return false; ++ } ++ rw.getTargets()[0].getDatanodeDescriptor().addBlockToBeErasureCoded( ++ new ExtendedBlock(namesystem.getBlockPoolId(), block), ++ rw.getSrcNodes(), rw.getTargets(), ++ ((ErasureCodingWork) rw).getLiveBlockIndicies(), ++ ecZone.getErasureCodingPolicy()); ++ } else { ++ rw.getSrcNodes()[0].addBlockToBeReplicated(block, targets); ++ } ++ + DatanodeStorageInfo.incrementBlocksScheduled(targets); + + // Move the block-replication into a "pending" state. + // The reason we use 'pending' is so we can retry + // replications that fail after an appropriate amount of time. + pendingReplications.increment(block, + DatanodeStorageInfo.toDatanodeDescriptors(targets)); + blockLog.debug("BLOCK* block {} is moved from neededReplications to " + + "pendingReplications", block); + + int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum; + // remove from neededReplications + if(numEffectiveReplicas + targets.length >= requiredReplication) { + neededReplications.remove(block, priority); + } + return true; + } + /** Choose target for WebHDFS redirection. */ public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) { @@@ -1916,59 -1755,7 +1906,17 @@@ } } + private static class BlockInfoToAdd { + final BlockInfo stored; + final Block reported; + + BlockInfoToAdd(BlockInfo stored, Block reported) { + this.stored = stored; + this.reported = reported; + } + } + /** - * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a - * list of blocks that should be considered corrupt due to a block report. - */ - private static class BlockToMarkCorrupt { - /** - * The corrupted block in a datanode. This is the one reported by the - * datanode. - */ - final Block corrupted; - /** The corresponding block stored in the BlockManager. */ - final BlockInfo stored; - /** The reason to mark corrupt. */ - final String reason; - /** The reason code to be stored */ - final Reason reasonCode; - - BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason, - Reason reasonCode) { - Preconditions.checkNotNull(corrupted, "corrupted is null"); - Preconditions.checkNotNull(stored, "stored is null"); - - this.corrupted = corrupted; - this.stored = stored; - this.reason = reason; - this.reasonCode = reasonCode; - } - - BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs, - String reason, Reason reasonCode) { - this(corrupted, stored, reason, reasonCode); - //the corrupted block in datanode has a different generation stamp - corrupted.setGenerationStamp(gs); - } - - @Override - public String toString() { - return corrupted + "(" - + (corrupted == stored? "same as stored": "stored=" + stored) + ")"; - } - } - - /** * The given storage is reporting all its blocks. * Update the (storage-->block list) and (block-->storage list) maps. * @@@ -2721,8 -2484,8 +2669,8 @@@ // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED - && numCurrentReplica >= minReplication) { + && hasMinStorage(storedBlock, numCurrentReplica)) { - completeBlock(getBlockCollection(storedBlock), storedBlock, false); + completeBlock(storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block // only complete blocks are counted towards that. @@@ -2796,8 -2558,8 +2744,8 @@@ + pendingReplications.getNumReplicas(storedBlock); if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && - numLiveReplicas >= minReplication) { + hasMinStorage(storedBlock, numLiveReplicas)) { - storedBlock = completeBlock(bc, storedBlock, false); + completeBlock(storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block // only complete blocks are counted towards that @@@ -4171,138 -3740,7 +4119,32 @@@ null); } + public static LocatedStripedBlock newLocatedStripedBlock( + ExtendedBlock b, DatanodeStorageInfo[] storages, + int[] indices, long startOffset, boolean corrupt) { + // startOffset is unknown + return new LocatedStripedBlock( + b, DatanodeStorageInfo.toDatanodeInfos(storages), + DatanodeStorageInfo.toStorageIDs(storages), + DatanodeStorageInfo.toStorageTypes(storages), + indices, startOffset, corrupt, + null); + } + + public static LocatedBlock newLocatedBlock(ExtendedBlock eb, BlockInfo info, + DatanodeStorageInfo[] locs, long offset) throws IOException { + final LocatedBlock lb; + if (info.isStriped()) { + lb = newLocatedStripedBlock(eb, locs, + info.getUnderConstructionFeature().getBlockIndices(), + offset, false); + } else { + lb = newLocatedBlock(eb, locs, offset, false); + } + return lb; + } + /** - * This class is used internally by {@link this#computeRecoveryWorkForBlocks} - * to represent a task to recover a block through replication or erasure - * coding. Recovery is done by transferring data from srcNodes to targets - */ - private abstract static class BlockRecoveryWork { - final BlockInfo block; - final BlockCollection bc; - - /** - * An erasure coding recovery task has multiple source nodes. - * A replication task only has 1 source node, stored on top of the array - */ - final DatanodeDescriptor[] srcNodes; - /** Nodes containing the block; avoid them in choosing new targets */ - final List<DatanodeDescriptor> containingNodes; - /** Required by {@link BlockPlacementPolicy#chooseTarget} */ - final List<DatanodeStorageInfo> liveReplicaStorages; - final int additionalReplRequired; - - DatanodeStorageInfo[] targets; - final int priority; - - BlockRecoveryWork(BlockInfo block, - BlockCollection bc, - DatanodeDescriptor[] srcNodes, - List<DatanodeDescriptor> containingNodes, - List<DatanodeStorageInfo> liveReplicaStorages, - int additionalReplRequired, - int priority) { - this.block = block; - this.bc = bc; - this.srcNodes = srcNodes; - this.containingNodes = containingNodes; - this.liveReplicaStorages = liveReplicaStorages; - this.additionalReplRequired = additionalReplRequired; - this.priority = priority; - this.targets = null; - } - - abstract void chooseTargets(BlockPlacementPolicy blockplacement, - BlockStoragePolicySuite storagePolicySuite, - Set<Node> excludedNodes); - } - - private static class ReplicationWork extends BlockRecoveryWork { - ReplicationWork(BlockInfo block, - BlockCollection bc, - DatanodeDescriptor[] srcNodes, - List<DatanodeDescriptor> containingNodes, - List<DatanodeStorageInfo> liveReplicaStorages, - int additionalReplRequired, - int priority) { - super(block, bc, srcNodes, containingNodes, - liveReplicaStorages, additionalReplRequired, priority); - LOG.debug("Creating a ReplicationWork to recover " + block); - } - - @Override - void chooseTargets(BlockPlacementPolicy blockplacement, - BlockStoragePolicySuite storagePolicySuite, - Set<Node> excludedNodes) { - assert srcNodes.length > 0 - : "At least 1 source node should have been selected"; - try { - targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNodes[0], liveReplicaStorages, false, - excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); - } finally { - srcNodes[0].decrementPendingReplicationWithoutTargets(); - } - } - } - - private static class ErasureCodingWork extends BlockRecoveryWork { - final short[] liveBlockIndicies; - - ErasureCodingWork(BlockInfo block, - BlockCollection bc, - DatanodeDescriptor[] srcNodes, - List<DatanodeDescriptor> containingNodes, - List<DatanodeStorageInfo> liveReplicaStorages, - int additionalReplRequired, - int priority, short[] liveBlockIndicies) { - super(block, bc, srcNodes, containingNodes, - liveReplicaStorages, additionalReplRequired, priority); - this.liveBlockIndicies = liveBlockIndicies; - LOG.debug("Creating an ErasureCodingWork to recover " + block); - } - - @Override - void chooseTargets(BlockPlacementPolicy blockplacement, - BlockStoragePolicySuite storagePolicySuite, - Set<Node> excludedNodes) { - try { - // TODO: new placement policy for EC considering multiple writers - targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNodes[0], liveReplicaStorages, false, - excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); - } finally { - } - } - } - - /** * A simple result enum for the result of * {@link BlockManager#processMisReplicatedBlock(BlockInfo)}. */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java index 0000000,0000000..ed546df new file mode 100644 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockRecoveryWork.java @@@ -1,0 -1,0 +1,111 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hdfs.server.blockmanagement; ++ ++import org.apache.hadoop.net.Node; ++ ++import java.util.Collections; ++import java.util.List; ++import java.util.Set; ++ ++/** ++ * This class is used internally by ++ * {@link BlockManager#computeRecoveryWorkForBlocks} to represent a task to ++ * recover a block through replication or erasure coding. Recovery is done by ++ * transferring data from srcNodes to targets ++ */ ++abstract class BlockRecoveryWork { ++ private final BlockInfo block; ++ ++ private final BlockCollection bc; ++ ++ /** ++ * An erasure coding recovery task has multiple source nodes. ++ * A replication task only has 1 source node, stored on top of the array ++ */ ++ private final DatanodeDescriptor[] srcNodes; ++ /** Nodes containing the block; avoid them in choosing new targets */ ++ private final List<DatanodeDescriptor> containingNodes; ++ /** Required by {@link BlockPlacementPolicy#chooseTarget} */ ++ private final List<DatanodeStorageInfo> liveReplicaStorages; ++ private final int additionalReplRequired; ++ ++ private DatanodeStorageInfo[] targets; ++ private final int priority; ++ ++ public BlockRecoveryWork(BlockInfo block, ++ BlockCollection bc, ++ DatanodeDescriptor[] srcNodes, ++ List<DatanodeDescriptor> containingNodes, ++ List<DatanodeStorageInfo> liveReplicaStorages, ++ int additionalReplRequired, ++ int priority) { ++ this.block = block; ++ this.bc = bc; ++ this.srcNodes = srcNodes; ++ this.containingNodes = containingNodes; ++ this.liveReplicaStorages = liveReplicaStorages; ++ this.additionalReplRequired = additionalReplRequired; ++ this.priority = priority; ++ this.targets = null; ++ } ++ ++ DatanodeStorageInfo[] getTargets() { ++ return targets; ++ } ++ ++ void resetTargets() { ++ this.targets = null; ++ } ++ ++ void setTargets(DatanodeStorageInfo[] targets) { ++ this.targets = targets; ++ } ++ ++ List<DatanodeDescriptor> getContainingNodes() { ++ return Collections.unmodifiableList(containingNodes); ++ } ++ ++ public int getPriority() { ++ return priority; ++ } ++ ++ public BlockInfo getBlock() { ++ return block; ++ } ++ ++ public DatanodeDescriptor[] getSrcNodes() { ++ return srcNodes; ++ } ++ ++ BlockCollection getBc() { ++ return bc; ++ } ++ ++ List<DatanodeStorageInfo> getLiveReplicaStorages() { ++ return liveReplicaStorages; ++ } ++ ++ public int getAdditionalReplRequired() { ++ return additionalReplRequired; ++ } ++ ++ abstract void chooseTargets(BlockPlacementPolicy blockplacement, ++ BlockStoragePolicySuite storagePolicySuite, ++ Set<Node> excludedNodes); ++} http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java index 0000000,3842e56..a871390 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockToMarkCorrupt.java @@@ -1,0 -1,87 +1,82 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs.server.blockmanagement; + + import static org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason; + + import com.google.common.base.Preconditions; ++import org.apache.hadoop.hdfs.protocol.Block; + + /** + * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a + * list of blocks that should be considered corrupt due to a block report. + */ + class BlockToMarkCorrupt { + /** The corrupted block in a datanode. */ - private final BlockInfo corrupted; ++ private final Block corrupted; + /** The corresponding block stored in the BlockManager. */ + private final BlockInfo stored; + /** The reason to mark corrupt. */ + private final String reason; + /** The reason code to be stored */ + private final CorruptReplicasMap.Reason reasonCode; + - BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason, ++ BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason, + CorruptReplicasMap.Reason reasonCode) { + Preconditions.checkNotNull(corrupted, "corrupted is null"); + Preconditions.checkNotNull(stored, "stored is null"); + + this.corrupted = corrupted; + this.stored = stored; + this.reason = reason; + this.reasonCode = reasonCode; + } + - BlockToMarkCorrupt(BlockInfo stored, String reason, ++ BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs, String reason, + CorruptReplicasMap.Reason reasonCode) { - this(stored, stored, reason, reasonCode); - } - - BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, - CorruptReplicasMap.Reason reasonCode) { - this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored, - reason, reasonCode); ++ this(corrupted, stored, reason, reasonCode); + //the corrupted block in datanode has a different generation stamp - corrupted.setGenerationStamp(gs); ++ this.corrupted.setGenerationStamp(gs); + } + + public boolean isCorruptedDuringWrite() { + return stored.getGenerationStamp() > corrupted.getGenerationStamp(); + } + - public BlockInfo getCorrupted() { ++ public Block getCorrupted() { + return corrupted; + } + + public BlockInfo getStored() { + return stored; + } + + public String getReason() { + return reason; + } + + public Reason getReasonCode() { + return reasonCode; + } + + @Override + public String toString() { + return corrupted + "(" + + (corrupted == stored ? "same as stored": "stored=" + stored) + ")"; + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 29e541c,0b398c5..b258f06 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@@ -38,9 -39,8 +39,10 @@@ import org.apache.hadoop.fs.StorageType import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.namenode.CachedBlock; +import org.apache.hadoop.hdfs.server.protocol.BlockECRecoveryCommand.BlockECRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; @@@ -696,29 -663,32 +698,39 @@@ public class DatanodeDescriptor extend } } + @VisibleForTesting + public boolean containsInvalidateBlock(Block block) { + synchronized (invalidateBlocks) { + return invalidateBlocks.contains(block); + } + } + /** - * Return the sum of remaining spaces of the specified type. If the remaining - * space of a storage is less than minSize, it won't be counted toward the - * sum. + * Find whether the datanode contains good storage of given type to + * place block of size <code>blockSize</code>. + * + * <p>Currently datanode only cares about the storage type, in this + * method, the first storage of given type we see is returned. * - * @param t The storage type. If null, the type is ignored. - * @param minSize The minimum free space required. - * @return the sum of remaining spaces that are bigger than minSize. + * @param t requested storage type + * @param blockSize requested block size + * @return */ - public long getRemaining(StorageType t, long minSize) { + public DatanodeStorageInfo chooseStorage4Block(StorageType t, + long blockSize) { + final long requiredSize = + blockSize * HdfsServerConstants.MIN_BLOCKS_FOR_WRITE; + final long scheduledSize = blockSize * getBlocksScheduled(t); long remaining = 0; + DatanodeStorageInfo storage = null; for (DatanodeStorageInfo s : getStorageInfos()) { if (s.getState() == State.NORMAL && - (t == null || s.getStorageType() == t)) { + s.getStorageType() == t) { + if (storage == null) { + storage = s; + } long r = s.getRemaining(); - if (r >= minSize) { + if (r >= requiredSize) { remaining += r; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java index 0000000,0000000..761d6d0 new file mode 100644 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java @@@ -1,0 -1,0 +1,60 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.hdfs.server.blockmanagement; ++ ++import org.apache.hadoop.net.Node; ++ ++import java.util.List; ++import java.util.Set; ++ ++class ErasureCodingWork extends BlockRecoveryWork { ++ private final short[] liveBlockIndicies; ++ ++ public ErasureCodingWork(BlockInfo block, ++ BlockCollection bc, ++ DatanodeDescriptor[] srcNodes, ++ List<DatanodeDescriptor> containingNodes, ++ List<DatanodeStorageInfo> liveReplicaStorages, ++ int additionalReplRequired, ++ int priority, short[] liveBlockIndicies) { ++ super(block, bc, srcNodes, containingNodes, ++ liveReplicaStorages, additionalReplRequired, priority); ++ this.liveBlockIndicies = liveBlockIndicies; ++ BlockManager.LOG.debug("Creating an ErasureCodingWork to recover " + block); ++ } ++ ++ short[] getLiveBlockIndicies() { ++ return liveBlockIndicies; ++ } ++ ++ @Override ++ void chooseTargets(BlockPlacementPolicy blockplacement, ++ BlockStoragePolicySuite storagePolicySuite, ++ Set<Node> excludedNodes) { ++ try { ++ // TODO: new placement policy for EC considering multiple writers ++ DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget( ++ getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0], ++ getLiveReplicaStorages(), false, excludedNodes, ++ getBlock().getNumBytes(), ++ storagePolicySuite.getPolicy(getBc().getStoragePolicyID())); ++ setTargets(chosenTargets); ++ } finally { ++ } ++ } ++} http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java index 0000000,f8a6dad..8266f45 mode 000000,100644..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicationWork.java @@@ -1,0 -1,87 +1,53 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.hdfs.server.blockmanagement; + + import org.apache.hadoop.net.Node; + + import java.util.Collections; + import java.util.List; + import java.util.Set; + -class ReplicationWork { - private final BlockInfo block; - private final BlockCollection bc; - private final DatanodeDescriptor srcNode; - private final int additionalReplRequired; - private final int priority; - private final List<DatanodeDescriptor> containingNodes; - private final List<DatanodeStorageInfo> liveReplicaStorages; - private DatanodeStorageInfo[] targets; - ++class ReplicationWork extends BlockRecoveryWork { + public ReplicationWork(BlockInfo block, BlockCollection bc, - DatanodeDescriptor srcNode, List<DatanodeDescriptor> containingNodes, ++ DatanodeDescriptor[] srcNodes, List<DatanodeDescriptor> containingNodes, + List<DatanodeStorageInfo> liveReplicaStorages, int additionalReplRequired, + int priority) { - this.block = block; - this.bc = bc; - this.srcNode = srcNode; - this.srcNode.incrementPendingReplicationWithoutTargets(); - this.containingNodes = containingNodes; - this.liveReplicaStorages = liveReplicaStorages; - this.additionalReplRequired = additionalReplRequired; - this.priority = priority; - this.targets = null; ++ super(block, bc, srcNodes, containingNodes, ++ liveReplicaStorages, additionalReplRequired, priority); ++ BlockManager.LOG.debug("Creating a ReplicationWork to recover " + block); + } + ++ @Override + void chooseTargets(BlockPlacementPolicy blockplacement, + BlockStoragePolicySuite storagePolicySuite, + Set<Node> excludedNodes) { ++ assert getSrcNodes().length > 0 ++ : "At least 1 source node should have been selected"; + try { - targets = blockplacement.chooseTarget(bc.getName(), - additionalReplRequired, srcNode, liveReplicaStorages, false, - excludedNodes, block.getNumBytes(), - storagePolicySuite.getPolicy(bc.getStoragePolicyID())); ++ DatanodeStorageInfo[] chosenTargets = blockplacement.chooseTarget( ++ getBc().getName(), getAdditionalReplRequired(), getSrcNodes()[0], ++ getLiveReplicaStorages(), false, excludedNodes, ++ getBlock().getNumBytes(), ++ storagePolicySuite.getPolicy(getBc().getStoragePolicyID())); ++ setTargets(chosenTargets); + } finally { - srcNode.decrementPendingReplicationWithoutTargets(); ++ getSrcNodes()[0].decrementPendingReplicationWithoutTargets(); + } + } - - DatanodeStorageInfo[] getTargets() { - return targets; - } - - void resetTargets() { - this.targets = null; - } - - List<DatanodeDescriptor> getContainingNodes() { - return Collections.unmodifiableList(containingNodes); - } - - public int getPriority() { - return priority; - } - - public BlockInfo getBlock() { - return block; - } - - public DatanodeDescriptor getSrcNode() { - return srcNode; - } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java ---------------------------------------------------------------------- diff --cc hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java index 3c77120,0000000..6cc1dcd mode 100644,000000..100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoStriped.java @@@ -1,250 -1,0 +1,220 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo.AddBlockResult; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; + +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_DATA_BLOCKS; +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.NUM_PARITY_BLOCKS; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +/** + * Test {@link BlockInfoStriped} + */ +public class TestBlockInfoStriped { + private static final int TOTAL_NUM_BLOCKS = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS; + private static final long BASE_ID = -1600; + private static final Block baseBlock = new Block(BASE_ID); + private static final ErasureCodingPolicy testECPolicy + = ErasureCodingPolicyManager.getSystemDefaultPolicy(); + private final BlockInfoStriped info = new BlockInfoStriped(baseBlock, + testECPolicy); + + private Block[] createReportedBlocks(int num) { + Block[] blocks = new Block[num]; + for (int i = 0; i < num; i++) { + blocks[i] = new Block(BASE_ID + i); + } + return blocks; + } + + /** + * Test adding storage and reported block + */ + @Test + public void testAddStorage() { + // first add NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS storages, i.e., a complete + // group of blocks/storages + DatanodeStorageInfo[] storageInfos = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS); + Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); + int i = 0; + for (; i < storageInfos.length; i += 2) { + info.addStorage(storageInfos[i], blocks[i]); + Assert.assertEquals(i/2 + 1, info.numNodes()); + } + i /= 2; + for (int j = 1; j < storageInfos.length; j += 2) { + Assert.assertTrue(info.addStorage(storageInfos[j], blocks[j])); + Assert.assertEquals(i + (j+1)/2, info.numNodes()); + } + + // check + byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length); + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + int index = info.findStorageInfo(storage); + Assert.assertEquals(i++, index); + Assert.assertEquals(index, indices[index]); + } + + // the same block is reported from the same storage twice + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + Assert.assertTrue(info.addStorage(storage, blocks[i++])); + } + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS, indices.length); + i = 0; + for (DatanodeStorageInfo storage : storageInfos) { + int index = info.findStorageInfo(storage); + Assert.assertEquals(i++, index); + Assert.assertEquals(index, indices[index]); + } + + // the same block is reported from another storage + DatanodeStorageInfo[] storageInfos2 = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS * 2); + // only add the second half of info2 + for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) { + info.addStorage(storageInfos2[i], blocks[i % TOTAL_NUM_BLOCKS]); + Assert.assertEquals(i + 1, info.getCapacity()); + Assert.assertEquals(i + 1, info.numNodes()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(i + 1, indices.length); + } + for (i = TOTAL_NUM_BLOCKS; i < storageInfos2.length; i++) { + int index = info.findStorageInfo(storageInfos2[i]); + Assert.assertEquals(i++, index); + Assert.assertEquals(index - TOTAL_NUM_BLOCKS, indices[index]); + } + } + + @Test + public void testRemoveStorage() { + // first add TOTAL_NUM_BLOCKS into the BlockInfoStriped + DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS); + Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); + for (int i = 0; i < storages.length; i++) { + info.addStorage(storages[i], blocks[i]); + } + + // remove two storages + info.removeStorage(storages[0]); + info.removeStorage(storages[2]); + + // check + Assert.assertEquals(TOTAL_NUM_BLOCKS, info.getCapacity()); + Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes()); + byte[] indices = (byte[]) Whitebox.getInternalState(info, "indices"); + for (int i = 0; i < storages.length; i++) { + int index = info.findStorageInfo(storages[i]); + if (i != 0 && i != 2) { + Assert.assertEquals(i, index); + Assert.assertEquals(index, indices[index]); + } else { + Assert.assertEquals(-1, index); + Assert.assertEquals(-1, indices[i]); + } + } + + // the same block is reported from another storage + DatanodeStorageInfo[] storages2 = DFSTestUtil.createDatanodeStorageInfos( + TOTAL_NUM_BLOCKS * 2); + for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) { + info.addStorage(storages2[i], blocks[i % TOTAL_NUM_BLOCKS]); + } + // now we should have 8 storages + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length); + int j = TOTAL_NUM_BLOCKS; + for (int i = TOTAL_NUM_BLOCKS; i < storages2.length; i++) { + int index = info.findStorageInfo(storages2[i]); + if (i == TOTAL_NUM_BLOCKS || i == TOTAL_NUM_BLOCKS + 2) { + Assert.assertEquals(i - TOTAL_NUM_BLOCKS, index); + } else { + Assert.assertEquals(j++, index); + } + } + + // remove the storages from storages2 + for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) { + info.removeStorage(storages2[i + TOTAL_NUM_BLOCKS]); + } + // now we should have 3 storages + Assert.assertEquals(TOTAL_NUM_BLOCKS - 2, info.numNodes()); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, info.getCapacity()); + indices = (byte[]) Whitebox.getInternalState(info, "indices"); + Assert.assertEquals(TOTAL_NUM_BLOCKS * 2 - 2, indices.length); + for (int i = 0; i < TOTAL_NUM_BLOCKS; i++) { + if (i == 0 || i == 2) { + int index = info.findStorageInfo(storages2[i + TOTAL_NUM_BLOCKS]); + Assert.assertEquals(-1, index); + } else { + int index = info.findStorageInfo(storages[i]); + Assert.assertEquals(i, index); + } + } + for (int i = TOTAL_NUM_BLOCKS; i < TOTAL_NUM_BLOCKS * 2 - 2; i++) { + Assert.assertEquals(-1, indices[i]); + Assert.assertNull(info.getDatanode(i)); + } + } + + @Test - public void testReplaceBlock() { - DatanodeStorageInfo[] storages = DFSTestUtil.createDatanodeStorageInfos( - TOTAL_NUM_BLOCKS); - Block[] blocks = createReportedBlocks(TOTAL_NUM_BLOCKS); - // add block/storage 0, 2, 4 into the BlockInfoStriped - for (int i = 0; i < storages.length; i += 2) { - Assert.assertEquals(AddBlockResult.ADDED, - storages[i].addBlock(info, blocks[i])); - } - - BlockInfoStriped newBlockInfo = new BlockInfoStriped(info, - info.getErasureCodingPolicy()); - newBlockInfo.setBlockCollectionId(info.getBlockCollectionId()); - info.replaceBlock(newBlockInfo); - - // make sure the newBlockInfo is correct - byte[] indices = (byte[]) Whitebox.getInternalState(newBlockInfo, "indices"); - for (int i = 0; i < storages.length; i += 2) { - int index = newBlockInfo.findStorageInfo(storages[i]); - Assert.assertEquals(i, index); - Assert.assertEquals(index, indices[i]); - - // make sure the newBlockInfo is added to the linked list of the storage - Assert.assertSame(newBlockInfo, storages[i].getBlockListHeadForTesting()); - Assert.assertEquals(1, storages[i].numBlocks()); - Assert.assertNull(newBlockInfo.getNext()); - } - } - - @Test + public void testWrite() { + long blkID = 1; + long numBytes = 1; + long generationStamp = 1; + ByteBuffer byteBuffer = ByteBuffer.allocate(Long.SIZE / Byte.SIZE * 3); + byteBuffer.putLong(blkID).putLong(numBytes).putLong(generationStamp); + + ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); + DataOutput out = new DataOutputStream(byteStream); + BlockInfoStriped blk = new BlockInfoStriped(new Block(blkID, numBytes, + generationStamp), testECPolicy); + + try { + blk.write(out); + } catch(Exception ex) { + fail("testWrite error:" + ex.getMessage()); + } + assertEquals(byteBuffer.array().length, byteStream.toByteArray().length); + assertArrayEquals(byteBuffer.array(), byteStream.toByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/53358fe6/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ----------------------------------------------------------------------
