http://git-wip-us.apache.org/repos/asf/hadoop/blob/ecf36348/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 508da85..a08bd2f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -17,12 +17,12 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; import static org.apache.hadoop.util.ExitUtil.terminate; import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -44,6 +44,7 @@ import javax.management.ObjectName; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; @@ -55,10 +56,11 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ErasureCodingZone; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -79,6 +81,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.StripedBlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; @@ -88,8 +91,14 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.io.erasurecode.ECSchema; + +import static org.apache.hadoop.hdfs.protocol.HdfsConstants.BLOCK_STRIPED_CELL_SIZE; +import static org.apache.hadoop.hdfs.util.StripedBlockUtil.getInternalBlockLength; + import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; @@ -187,7 +196,11 @@ public class BlockManager implements BlockStatsMXBean { /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); - /** Blocks to be invalidated. */ + /** + * Blocks to be invalidated. + * For a striped block to invalidate, we should track its individual internal + * blocks. + */ private final InvalidateBlocks invalidateBlocks; /** @@ -203,8 +216,8 @@ public class BlockManager implements BlockStatsMXBean { * Maps a StorageID to the set of blocks that are "extra" for this * DataNode. We'll eventually remove these extras. */ - public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap = - new TreeMap<String, LightWeightLinkedSet<Block>>(); + public final Map<String, LightWeightLinkedSet<BlockInfo>> excessReplicateMap = + new TreeMap<>(); /** * Store set of Blocks that need to be replicated 1 or more times. @@ -271,12 +284,15 @@ public class BlockManager implements BlockStatsMXBean { private double replicationQueuesInitProgress = 0.0; /** for block replicas placement */ - private BlockPlacementPolicy blockplacement; + private BlockPlacementPolicies placementPolicies; private final BlockStoragePolicySuite storagePolicySuite; /** Check whether name system is running before terminating */ private boolean checkNSRunning = true; + /** Check whether there are any non-EC blocks using StripedID */ + private boolean hasNonEcBlockUsingStripedID = false; + public BlockManager(final Namesystem namesystem, final Configuration conf) throws IOException { this.namesystem = namesystem; @@ -292,7 +308,7 @@ public class BlockManager implements BlockStatsMXBean { // Compute the map capacity by allocating 2% of total memory blocksMap = new BlocksMap( LightWeightGSet.computeCapacity(2.0, "BlocksMap")); - blockplacement = BlockPlacementPolicy.getInstance( + placementPolicies = new BlockPlacementPolicies( conf, datanodeManager.getFSClusterStats(), datanodeManager.getNetworkTopology(), datanodeManager.getHost2DatanodeMap()); @@ -494,15 +510,7 @@ public class BlockManager implements BlockStatsMXBean { @VisibleForTesting public BlockPlacementPolicy getBlockPlacementPolicy() { - return blockplacement; - } - - /** Set BlockPlacementPolicy */ - public void setBlockPlacementPolicy(BlockPlacementPolicy newpolicy) { - if (newpolicy == null) { - throw new HadoopIllegalArgumentException("newpolicy == null"); - } - this.blockplacement = newpolicy; + return placementPolicies.getPolicy(false); } /** Dump meta data to out. */ @@ -552,9 +560,9 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used - chooseSourceDatanode(block, containingNodes, + chooseSourceDatanodes(getStoredBlock(block), containingNodes, containingLiveReplicasNodes, numReplicas, - UnderReplicatedBlocks.LEVEL); + new LinkedList<Short>(), UnderReplicatedBlocks.LEVEL); // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count @@ -601,11 +609,28 @@ public class BlockManager implements BlockStatsMXBean { return maxReplicationStreams; } - /** - * @return true if the block has minimum replicas - */ - public boolean checkMinReplication(BlockInfo block) { - return (countNodes(block).liveReplicas() >= minReplication); + public int getDefaultStorageNum(BlockInfo block) { + if (block.isStriped()) { + return ((BlockInfoStriped) block).getRealTotalBlockNum(); + } else { + return defaultReplication; + } + } + + public short getMinStorageNum(BlockInfo block) { + if (block.isStriped()) { + return ((BlockInfoStriped) block).getRealDataBlockNum(); + } else { + return minReplication; + } + } + + public boolean hasMinStorage(BlockInfo block) { + return countNodes(block).liveReplicas() >= getMinStorageNum(block); + } + + public boolean hasMinStorage(BlockInfo block, int liveNum) { + return liveNum >= getMinStorageNum(block); } /** @@ -617,16 +642,21 @@ public class BlockManager implements BlockStatsMXBean { * @throws IOException if the block does not have at least a minimal number * of replicas reported from data-nodes. */ - private static boolean commitBlock( - final BlockInfoContiguousUnderConstruction block, final Block commitBlock) - throws IOException { - if (block.getBlockUCState() == BlockUCState.COMMITTED) - return false; - assert block.getNumBytes() <= commitBlock.getNumBytes() : - "commitBlock length is less than the stored one " - + commitBlock.getNumBytes() + " vs. " + block.getNumBytes(); - block.commitBlock(commitBlock); - return true; + private static boolean commitBlock(final BlockInfo block, + final Block commitBlock) throws IOException { + if (block instanceof BlockInfoUnderConstruction + && block.getBlockUCState() != BlockUCState.COMMITTED) { + final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block; + + assert block.getNumBytes() <= commitBlock.getNumBytes() : + "commitBlock length is less than the stored one " + + commitBlock.getNumBytes() + " vs. " + block.getNumBytes(); + + uc.commitBlock(commitBlock); + return true; + } + + return false; } /** @@ -649,10 +679,10 @@ public class BlockManager implements BlockStatsMXBean { if(lastBlock.isComplete()) return false; // already completed (e.g. by syncBlock) - final boolean b = commitBlock( - (BlockInfoContiguousUnderConstruction) lastBlock, commitBlock); - if(countNodes(lastBlock).liveReplicas() >= minReplication) - completeBlock(bc, bc.numBlocks()-1, false); + final boolean b = commitBlock(lastBlock, commitBlock); + if (hasMinStorage(lastBlock)) { + completeBlock(bc, bc.numBlocks() - 1, false); + } return b; } @@ -665,21 +695,28 @@ public class BlockManager implements BlockStatsMXBean { */ private BlockInfo completeBlock(final BlockCollection bc, final int blkIndex, boolean force) throws IOException { - if(blkIndex < 0) + if (blkIndex < 0) { return null; + } BlockInfo curBlock = bc.getBlocks()[blkIndex]; - if(curBlock.isComplete()) + if (curBlock.isComplete()) { return curBlock; - BlockInfoContiguousUnderConstruction ucBlock = - (BlockInfoContiguousUnderConstruction) curBlock; - int numNodes = ucBlock.numNodes(); - if (!force && numNodes < minReplication) + } + + int numNodes = curBlock.numNodes(); + if (!force && !hasMinStorage(curBlock, numNodes)) { throw new IOException("Cannot complete block: " + "block does not satisfy minimal replication requirement."); - if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED) + } + if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) { throw new IOException( "Cannot complete block: block has not been COMMITTED by the client"); - BlockInfo completeBlock = ucBlock.convertToCompleteBlock(); + } + + final BlockInfo completeBlock + = !(curBlock instanceof BlockInfoUnderConstruction)? curBlock + : ((BlockInfoUnderConstruction)curBlock).convertToCompleteBlock(); + // replace penultimate block in file bc.setBlock(blkIndex, completeBlock); @@ -690,8 +727,10 @@ public class BlockManager implements BlockStatsMXBean { // a "forced" completion when a file is getting closed by an // OP_CLOSE edit on the standby). namesystem.adjustSafeModeBlockTotals(0, 1); + final int minStorage = curBlock.isStriped() ? + ((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication; namesystem.incrementSafeBlockCount( - Math.min(numNodes, minReplication)); + Math.min(numNodes, minStorage), curBlock); // replace block in the blocksMap return blocksMap.replaceBlock(completeBlock); @@ -700,10 +739,11 @@ public class BlockManager implements BlockStatsMXBean { private BlockInfo completeBlock(final BlockCollection bc, final BlockInfo block, boolean force) throws IOException { BlockInfo[] fileBlocks = bc.getBlocks(); - for(int idx = 0; idx < fileBlocks.length; idx++) - if(fileBlocks[idx] == block) { + for (int idx = 0; idx < fileBlocks.length; idx++) { + if (fileBlocks[idx] == block) { return completeBlock(bc, idx, force); } + } return block; } @@ -713,8 +753,10 @@ public class BlockManager implements BlockStatsMXBean { * when tailing edit logs as a Standby. */ public BlockInfo forceCompleteBlock(final BlockCollection bc, - final BlockInfoContiguousUnderConstruction block) throws IOException { - block.commitBlock(block); + final BlockInfo block) throws IOException { + if (block instanceof BlockInfoUnderConstruction) { + ((BlockInfoUnderConstruction)block).commitBlock(block); + } return completeBlock(bc, block, true); } @@ -744,8 +786,10 @@ public class BlockManager implements BlockStatsMXBean { DatanodeStorageInfo[] targets = getStorages(oldBlock); - BlockInfoContiguousUnderConstruction ucBlock = - bc.setLastBlock(oldBlock, targets); + // convert the last block to UC + bc.convertLastBlockToUC(oldBlock, targets); + // get the new created uc block + BlockInfo ucBlock = bc.getLastBlock(); blocksMap.replaceBlock(ucBlock); // Remove block from replication queue. @@ -756,14 +800,17 @@ public class BlockManager implements BlockStatsMXBean { // remove this block from the list of pending blocks to be deleted. for (DatanodeStorageInfo storage : targets) { - invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock); + final Block b = getBlockOnStorage(oldBlock, storage); + if (b != null) { + invalidateBlocks.remove(storage.getDatanodeDescriptor(), b); + } } // Adjust safe-mode totals, since under-construction blocks don't // count in safe-mode. namesystem.adjustSafeModeBlockTotals( // decrement safe if we had enough - targets.length >= minReplication ? -1 : 0, + hasMinStorage(oldBlock, targets.length) ? -1 : 0, // always decrement total blocks -1); @@ -775,23 +822,25 @@ public class BlockManager implements BlockStatsMXBean { /** * Get all valid locations of the block */ - private List<DatanodeStorageInfo> getValidLocations(Block block) { + private List<DatanodeStorageInfo> getValidLocations(BlockInfo block) { final List<DatanodeStorageInfo> locations = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block)); for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { // filter invalidate replicas - if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) { + Block b = getBlockOnStorage(block, storage); + if(b != null && + !invalidateBlocks.contains(storage.getDatanodeDescriptor(), b)) { locations.add(storage); } } return locations; } - + private List<LocatedBlock> createLocatedBlockList( final BlockInfo[] blocks, final long offset, final long length, final int nrBlocksToReturn, final AccessMode mode) throws IOException { - int curBlk = 0; + int curBlk; long curPos = 0, blkSize = 0; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; for (curBlk = 0; curBlk < nrBlocks; curBlk++) { @@ -804,10 +853,10 @@ public class BlockManager implements BlockStatsMXBean { } if (nrBlocks > 0 && curBlk == nrBlocks) // offset >= end of file - return Collections.<LocatedBlock>emptyList(); + return Collections.emptyList(); long endOff = offset + length; - List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length); + List<LocatedBlock> results = new ArrayList<>(blocks.length); do { results.add(createLocatedBlock(blocks[curBlk], curPos, mode)); curPos += blocks[curBlk].getNumBytes(); @@ -820,7 +869,7 @@ public class BlockManager implements BlockStatsMXBean { private LocatedBlock createLocatedBlock(final BlockInfo[] blocks, final long endPos, final AccessMode mode) throws IOException { - int curBlk = 0; + int curBlk; long curPos = 0; int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length; for (curBlk = 0; curBlk < nrBlocks; curBlk++) { @@ -833,7 +882,7 @@ public class BlockManager implements BlockStatsMXBean { return createLocatedBlock(blocks[curBlk], curPos, mode); } - + private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos, final AccessMode mode) throws IOException { final LocatedBlock lb = createLocatedBlock(blk, pos); @@ -844,19 +893,25 @@ public class BlockManager implements BlockStatsMXBean { } /** @return a LocatedBlock for the given block */ - private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos - ) throws IOException { - if (blk instanceof BlockInfoContiguousUnderConstruction) { - if (blk.isComplete()) { - throw new IOException( - "blk instanceof BlockInfoUnderConstruction && blk.isComplete()" - + ", blk=" + blk); + private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos) { + if (!blk.isComplete()) { + if (blk.isStriped()) { + final BlockInfoStripedUnderConstruction uc = + (BlockInfoStripedUnderConstruction) blk; + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); + final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), + blk); + return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, + false); + } else { + assert blk instanceof BlockInfoContiguousUnderConstruction; + final BlockInfoContiguousUnderConstruction uc = + (BlockInfoContiguousUnderConstruction) blk; + final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); + final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), + blk); + return newLocatedBlock(eb, storages, pos, false); } - final BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction) blk; - final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); - final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return newLocatedBlock(eb, storages, pos, false); } // get block locations @@ -873,13 +928,21 @@ public class BlockManager implements BlockStatsMXBean { numCorruptNodes == numNodes; final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes; final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines]; - int j = 0; + final int[] blockIndices = blk.isStriped() ? new int[numMachines] : null; + int j = 0, i = 0; if (numMachines > 0) { for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) { final DatanodeDescriptor d = storage.getDatanodeDescriptor(); final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d); - if (isCorrupt || (!replicaCorrupt)) + if (isCorrupt || (!replicaCorrupt)) { machines[j++] = storage; + // TODO this can be more efficient + if (blockIndices != null) { + int index = ((BlockInfoStriped) blk).getStorageBlockIndex(storage); + assert index >= 0; + blockIndices[i++] = index; + } + } } } assert j == machines.length : @@ -889,7 +952,9 @@ public class BlockManager implements BlockStatsMXBean { " numCorrupt: " + numCorruptNodes + " numCorruptRepls: " + numCorruptReplicas; final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); - return newLocatedBlock(eb, machines, pos, isCorrupt); + return blockIndices == null ? + newLocatedBlock(eb, machines, pos, isCorrupt) : + newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt); } /** Create a LocatedBlocks. */ @@ -897,14 +962,18 @@ public class BlockManager implements BlockStatsMXBean { final long fileSizeExcludeBlocksUnderConstruction, final boolean isFileUnderConstruction, final long offset, final long length, final boolean needBlockToken, - final boolean inSnapshot, FileEncryptionInfo feInfo) + final boolean inSnapshot, FileEncryptionInfo feInfo, + ErasureCodingZone ecZone) throws IOException { assert namesystem.hasReadLock(); + final ECSchema schema = ecZone != null ? ecZone.getSchema() : null; + final int cellSize = ecZone != null ? ecZone.getCellSize() : 0; if (blocks == null) { return null; } else if (blocks.length == 0) { return new LocatedBlocks(0, isFileUnderConstruction, - Collections.<LocatedBlock>emptyList(), null, false, feInfo); + Collections.<LocatedBlock> emptyList(), null, false, feInfo, schema, + cellSize); } else { if (LOG.isDebugEnabled()) { LOG.debug("blocks = " + java.util.Arrays.asList(blocks)); @@ -927,9 +996,9 @@ public class BlockManager implements BlockStatsMXBean { fileSizeExcludeBlocksUnderConstruction, mode); isComplete = true; } - return new LocatedBlocks( - fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction, - locatedblocks, lastlb, isComplete, feInfo); + return new LocatedBlocks(fileSizeExcludeBlocksUnderConstruction, + isFileUnderConstruction, locatedblocks, lastlb, isComplete, feInfo, + schema, cellSize); } } @@ -944,9 +1013,23 @@ public class BlockManager implements BlockStatsMXBean { final AccessMode mode) throws IOException { if (isBlockTokenEnabled()) { // Use cached UGI if serving RPC calls. - b.setBlockToken(blockTokenSecretManager.generateToken( - NameNode.getRemoteUser().getShortUserName(), - b.getBlock(), EnumSet.of(mode))); + if (b.isStriped()) { + LocatedStripedBlock sb = (LocatedStripedBlock) b; + int[] indices = sb.getBlockIndices(); + Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length]; + ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock()); + for (int i = 0; i < indices.length; i++) { + internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]); + blockTokens[i] = blockTokenSecretManager.generateToken( + NameNode.getRemoteUser().getShortUserName(), + internalBlock, EnumSet.of(mode)); + } + sb.setBlockTokens(blockTokens); + } else { + b.setBlockToken(blockTokenSecretManager.generateToken( + NameNode.getRemoteUser().getShortUserName(), + b.getBlock(), EnumSet.of(mode))); + } } } @@ -1077,7 +1160,7 @@ public class BlockManager implements BlockStatsMXBean { /** Remove the blocks associated to the given datanode. */ void removeBlocksAssociatedTo(final DatanodeDescriptor node) { - final Iterator<? extends Block> it = node.getBlockIterator(); + final Iterator<BlockInfo> it = node.getBlockIterator(); while(it.hasNext()) { removeStoredBlock(it.next(), node); } @@ -1091,12 +1174,15 @@ public class BlockManager implements BlockStatsMXBean { /** Remove the blocks associated to the given DatanodeStorageInfo. */ void removeBlocksAssociatedTo(final DatanodeStorageInfo storageInfo) { assert namesystem.hasWriteLock(); - final Iterator<? extends Block> it = storageInfo.getBlockIterator(); + final Iterator<BlockInfo> it = storageInfo.getBlockIterator(); DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); while(it.hasNext()) { - Block block = it.next(); + BlockInfo block = it.next(); removeStoredBlock(block, node); - invalidateBlocks.remove(node, block); + final Block b = getBlockOnStorage(block, storageInfo); + if (b != null) { + invalidateBlocks.remove(node, b); + } } namesystem.checkSafeMode(); } @@ -1116,22 +1202,32 @@ public class BlockManager implements BlockStatsMXBean { * Adds block to list of blocks which will be invalidated on all its * datanodes. */ - private void addToInvalidates(Block b) { + private void addToInvalidates(BlockInfo storedBlock) { if (!namesystem.isPopulatingReplQueues()) { return; } StringBuilder datanodes = new StringBuilder(); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock, + State.NORMAL)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - invalidateBlocks.add(b, node, false); - datanodes.append(node).append(" "); + final Block b = getBlockOnStorage(storedBlock, storage); + if (b != null) { + invalidateBlocks.add(b, node, false); + datanodes.append(node).append(" "); + } } if (datanodes.length() != 0) { - blockLog.debug("BLOCK* addToInvalidates: {} {}", b, + blockLog.info("BLOCK* addToInvalidates: {} {}", storedBlock, datanodes.toString()); } } + private Block getBlockOnStorage(BlockInfo storedBlock, + DatanodeStorageInfo storage) { + return storedBlock.isStriped() ? + ((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock; + } + /** * Remove all block invalidation tasks under this datanode UUID; * used when a datanode registers with a new UUID and the old one @@ -1155,7 +1251,8 @@ public class BlockManager implements BlockStatsMXBean { public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, final DatanodeInfo dn, String storageID, String reason) throws IOException { assert namesystem.hasWriteLock(); - final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock()); + final Block reportedBlock = blk.getLocalBlock(); + final BlockInfo storedBlock = getStoredBlock(reportedBlock); if (storedBlock == null) { // Check if the replica is in the blockMap, if not // ignore the request for now. This could happen when BlockScanner @@ -1172,45 +1269,53 @@ public class BlockManager implements BlockStatsMXBean { + ") does not exist"); } - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, + markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock, blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), storageID == null ? null : node.getStorageInfo(storageID), node); } /** - * - * @param b + * Mark a replica (of a contiguous block) or an internal block (of a striped + * block group) as corrupt. + * @param b Indicating the reported bad block and the corresponding BlockInfo + * stored in blocksMap. * @param storageInfo storage that contains the block, if known. null otherwise. - * @throws IOException */ private void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeStorageInfo storageInfo, DatanodeDescriptor node) throws IOException { - if (b.corrupted.isDeleted()) { - blockLog.debug("BLOCK markBlockAsCorrupt: {} cannot be marked as" + + if (b.stored.isDeleted()) { + blockLog.info("BLOCK markBlockAsCorrupt: {} cannot be marked as" + " corrupt as it does not belong to any file", b); addToInvalidates(b.corrupted, node); return; } short expectedReplicas = - b.corrupted.getBlockCollection().getPreferredBlockReplication(); + getExpectedReplicaNum(b.stored.getBlockCollection(), b.stored); // Add replica to the data-node if it is not already there if (storageInfo != null) { - storageInfo.addBlock(b.stored); + storageInfo.addBlock(b.stored, b.corrupted); } - // Add this replica to corruptReplicas Map - corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, + // Add this replica to corruptReplicas Map. For striped blocks, we always + // use the id of whole striped block group when adding to corruptReplicas + Block corrupted = new Block(b.corrupted); + if (b.stored.isStriped()) { + corrupted.setBlockId(b.stored.getBlockId()); + } + corruptReplicas.addToCorruptReplicasMap(corrupted, node, b.reason, b.reasonCode); NumberReplicas numberOfReplicas = countNodes(b.stored); boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= expectedReplicas; - boolean minReplicationSatisfied = - numberOfReplicas.liveReplicas() >= minReplication; + + boolean minReplicationSatisfied = hasMinStorage(b.stored, + numberOfReplicas.liveReplicas()); + boolean hasMoreCorruptReplicas = minReplicationSatisfied && (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > expectedReplicas; @@ -1225,7 +1330,7 @@ public class BlockManager implements BlockStatsMXBean { if (hasEnoughLiveReplicas || hasMoreCorruptReplicas || corruptedDuringWrite) { // the block is over-replicated so invalidate the replicas immediately - invalidateBlock(b, node); + invalidateBlock(b, node, numberOfReplicas); } else if (namesystem.isPopulatingReplQueues()) { // add the block to neededReplication updateNeededReplications(b.stored, -1, 0); @@ -1237,8 +1342,8 @@ public class BlockManager implements BlockStatsMXBean { * @return true if the block was successfully invalidated and no longer * present in the BlocksMap */ - private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn - ) throws IOException { + private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn, + NumberReplicas nr) throws IOException { blockLog.debug("BLOCK* invalidateBlock: {} on {}", b, dn); DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { @@ -1247,7 +1352,6 @@ public class BlockManager implements BlockStatsMXBean { } // Check how many copies we have of the block - NumberReplicas nr = countNodes(b.stored); if (nr.replicasOnStaleNodes() > 0) { blockLog.debug("BLOCK* invalidateBlocks: postponing " + "invalidation of {} on {} because {} replica(s) are located on " + @@ -1318,15 +1422,15 @@ public class BlockManager implements BlockStatsMXBean { } /** - * Scan blocks in {@link #neededReplications} and assign replication - * work to data-nodes they belong to. + * Scan blocks in {@link #neededReplications} and assign recovery + * (replication or erasure coding) work to data-nodes they belong to. * * The number of process blocks equals either twice the number of live * data-nodes or the number of under-replicated blocks whichever is less. * * @return number of blocks scheduled for replication during this iteration. */ - int computeReplicationWork(int blocksToProcess) { + int computeBlockRecoveryWork(int blocksToProcess) { List<List<BlockInfo>> blocksToReplicate = null; namesystem.writeLock(); try { @@ -1336,30 +1440,32 @@ public class BlockManager implements BlockStatsMXBean { } finally { namesystem.writeUnlock(); } - return computeReplicationWorkForBlocks(blocksToReplicate); + return computeRecoveryWorkForBlocks(blocksToReplicate); } - /** Replicate a set of blocks + /** + * Recover a set of blocks to full strength through replication or + * erasure coding * - * @param blocksToReplicate blocks to be replicated, for each priority + * @param blocksToRecover blocks to be recovered, for each priority * @return the number of blocks scheduled for replication */ @VisibleForTesting - int computeReplicationWorkForBlocks(List<List<BlockInfo>> blocksToReplicate) { + int computeRecoveryWorkForBlocks(List<List<BlockInfo>> blocksToRecover) { int requiredReplication, numEffectiveReplicas; List<DatanodeDescriptor> containingNodes; - DatanodeDescriptor srcNode; - BlockCollection bc = null; + BlockCollection bc; int additionalReplRequired; int scheduledWork = 0; - List<ReplicationWork> work = new LinkedList<ReplicationWork>(); + List<BlockRecoveryWork> recovWork = new LinkedList<>(); + // Step 1: categorize at-risk blocks into replication and EC tasks namesystem.writeLock(); try { synchronized (neededReplications) { - for (int priority = 0; priority < blocksToReplicate.size(); priority++) { - for (BlockInfo block : blocksToReplicate.get(priority)) { + for (int priority = 0; priority < blocksToRecover.size(); priority++) { + for (BlockInfo block : blocksToRecover.get(priority)) { // block should belong to a file bc = blocksMap.getBlockCollection(block); // abandoned block or block reopened for append @@ -1370,31 +1476,34 @@ public class BlockManager implements BlockStatsMXBean { continue; } - requiredReplication = bc.getPreferredBlockReplication(); + requiredReplication = getExpectedReplicaNum(bc, block); // get a source data-node - containingNodes = new ArrayList<DatanodeDescriptor>(); - List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<DatanodeStorageInfo>(); + containingNodes = new ArrayList<>(); + List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>(); NumberReplicas numReplicas = new NumberReplicas(); - srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas, - priority); - if(srcNode == null) { // block can not be replicated from any node - LOG.debug("Block " + block + " cannot be repl from any node"); + List<Short> liveBlockIndices = new ArrayList<>(); + final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block, + containingNodes, liveReplicaNodes, numReplicas, + liveBlockIndices, priority); + if(srcNodes == null || srcNodes.length == 0) { + // block can not be replicated from any node + LOG.debug("Block " + block + " cannot be recovered " + + "from any node"); continue; } - // liveReplicaNodes can include READ_ONLY_SHARED replicas which are + // liveReplicaNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count assert liveReplicaNodes.size() >= numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplications.getNumReplicas(block); - + if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (blockHasEnoughRacks(block, requiredReplication)) ) { neededReplications.remove(block, priority); // remove from neededReplications blockLog.debug("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); @@ -1408,9 +1517,20 @@ public class BlockManager implements BlockStatsMXBean { } else { additionalReplRequired = 1; // Needed on a new rack } - work.add(new ReplicationWork(block, bc, srcNode, - containingNodes, liveReplicaNodes, additionalReplRequired, - priority)); + if (block.isStriped()) { + short[] indices = new short[liveBlockIndices.size()]; + for (int i = 0 ; i < liveBlockIndices.size(); i++) { + indices[i] = liveBlockIndices.get(i); + } + ErasureCodingWork ecw = new ErasureCodingWork(block, bc, srcNodes, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority, indices); + recovWork.add(ecw); + } else { + recovWork.add(new ReplicationWork(block, bc, srcNodes, + containingNodes, liveReplicaNodes, additionalReplRequired, + priority)); + } } } } @@ -1418,8 +1538,9 @@ public class BlockManager implements BlockStatsMXBean { namesystem.writeUnlock(); } + // Step 2: choose target nodes for each recovery task final Set<Node> excludedNodes = new HashSet<Node>(); - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ // Exclude all of the containing nodes from being targets. // This list includes decommissioning or corrupt nodes. excludedNodes.clear(); @@ -1430,12 +1551,15 @@ public class BlockManager implements BlockStatsMXBean { // choose replication targets: NOT HOLDING THE GLOBAL LOCK // It is costly to extract the filename for which chooseTargets is called, // so for now we pass in the block collection itself. - rw.chooseTargets(blockplacement, storagePolicySuite, excludedNodes); + final BlockPlacementPolicy placementPolicy = + placementPolicies.getPolicy(rw.block.isStriped()); + rw.chooseTargets(placementPolicy, storagePolicySuite, excludedNodes); } + // Step 3: add tasks to the DN namesystem.writeLock(); try { - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ final DatanodeStorageInfo[] targets = rw.targets; if(targets == null || targets.length == 0){ rw.targets = null; @@ -1454,7 +1578,7 @@ public class BlockManager implements BlockStatsMXBean { rw.targets = null; continue; } - requiredReplication = bc.getPreferredBlockReplication(); + requiredReplication = getExpectedReplicaNum(bc, block); // do not schedule more if enough replicas is already pending NumberReplicas numReplicas = countNodes(block); @@ -1463,7 +1587,7 @@ public class BlockManager implements BlockStatsMXBean { if (numEffectiveReplicas >= requiredReplication) { if ( (pendingReplications.getNumReplicas(block) > 0) || - (blockHasEnoughRacks(block)) ) { + (blockHasEnoughRacks(block, requiredReplication)) ) { neededReplications.remove(block, priority); // remove from neededReplications rw.targets = null; blockLog.debug("BLOCK* Removing {} from neededReplications as" + @@ -1473,8 +1597,8 @@ public class BlockManager implements BlockStatsMXBean { } if ( (numReplicas.liveReplicas() >= requiredReplication) && - (!blockHasEnoughRacks(block)) ) { - if (rw.srcNode.getNetworkLocation().equals( + (!blockHasEnoughRacks(block, requiredReplication)) ) { + if (rw.srcNodes[0].getNetworkLocation().equals( targets[0].getDatanodeDescriptor().getNetworkLocation())) { //No use continuing, unless a new rack in this case continue; @@ -1482,7 +1606,32 @@ public class BlockManager implements BlockStatsMXBean { } // Add block to the to be replicated list - rw.srcNode.addBlockToBeReplicated(block, targets); + if (block.isStriped()) { + assert rw instanceof ErasureCodingWork; + assert rw.targets.length > 0; + String src = block.getBlockCollection().getName(); + ErasureCodingZone ecZone = null; + try { + ecZone = namesystem.getErasureCodingZoneForPath(src); + } catch (IOException e) { + blockLog + .warn("Failed to get the EC zone for the file {} ", src); + } + if (ecZone == null) { + blockLog.warn("No EC schema found for the file {}. " + + "So cannot proceed for recovery", src); + // TODO: we may have to revisit later for what we can do better to + // handle this case. + continue; + } + rw.targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded( + new ExtendedBlock(namesystem.getBlockPoolId(), block), + rw.srcNodes, rw.targets, + ((ErasureCodingWork) rw).liveBlockIndicies, + ecZone.getSchema(), ecZone.getCellSize()); + } else { + rw.srcNodes[0].addBlockToBeReplicated(block, targets); + } scheduledWork++; DatanodeStorageInfo.incrementBlocksScheduled(targets); @@ -1506,15 +1655,15 @@ public class BlockManager implements BlockStatsMXBean { if (blockLog.isInfoEnabled()) { // log which blocks have been scheduled for replication - for(ReplicationWork rw : work){ + for(BlockRecoveryWork rw : recovWork){ DatanodeStorageInfo[] targets = rw.targets; if (targets != null && targets.length != 0) { StringBuilder targetList = new StringBuilder("datanode(s)"); - for (int k = 0; k < targets.length; k++) { + for (DatanodeStorageInfo target : targets) { targetList.append(' '); - targetList.append(targets[k].getDatanodeDescriptor()); + targetList.append(target.getDatanodeDescriptor()); } - blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNode, + blockLog.debug("BLOCK* ask {} to replicate {} to {}", rw.srcNodes, rw.block, targetList); } } @@ -1530,7 +1679,7 @@ public class BlockManager implements BlockStatsMXBean { /** Choose target for WebHDFS redirection. */ public DatanodeStorageInfo[] chooseTarget4WebHDFS(String src, DatanodeDescriptor clientnode, Set<Node> excludes, long blocksize) { - return blockplacement.chooseTarget(src, 1, clientnode, + return placementPolicies.getPolicy(false).chooseTarget(src, 1, clientnode, Collections.<DatanodeStorageInfo>emptyList(), false, excludes, blocksize, storagePolicySuite.getDefaultPolicy()); } @@ -1542,9 +1691,10 @@ public class BlockManager implements BlockStatsMXBean { List<DatanodeStorageInfo> chosen, Set<Node> excludes, long blocksize, - byte storagePolicyID) { - + byte storagePolicyID, + boolean isStriped) { final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); + final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped); return blockplacement.chooseTarget(src, numAdditionalNodes, clientnode, chosen, true, excludes, blocksize, storagePolicy); } @@ -1562,10 +1712,12 @@ public class BlockManager implements BlockStatsMXBean { final Set<Node> excludedNodes, final long blocksize, final List<String> favoredNodes, - final byte storagePolicyID) throws IOException { + final byte storagePolicyID, + final boolean isStriped) throws IOException { List<DatanodeDescriptor> favoredDatanodeDescriptors = getDatanodeDescriptors(favoredNodes); final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(storagePolicyID); + final BlockPlacementPolicy blockplacement = placementPolicies.getPolicy(isStriped); final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src, numOfReplicas, client, excludedNodes, blocksize, favoredDatanodeDescriptors, storagePolicy); @@ -1600,55 +1752,59 @@ public class BlockManager implements BlockStatsMXBean { } /** - * Parse the data-nodes the block belongs to and choose one, - * which will be the replication source. + * Parse the data-nodes the block belongs to and choose a certain number + * from them to be the recovery sources. * * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source. - * Otherwise we choose a random node among those that did not reach their - * replication limits. However, if the replication is of the highest priority - * and all nodes have reached their replication limits, we will choose a - * random node despite the replication limit. + * Otherwise we randomly choose nodes among those that did not reach their + * replication limits. However, if the recovery work is of the highest + * priority and all nodes have reached their replication limits, we will + * randomly choose the desired number of nodes despite the replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. * * @param block Block for which a replication source is needed - * @param containingNodes List to be populated with nodes found to contain the - * given block - * @param nodesContainingLiveReplicas List to be populated with nodes found to - * contain live replicas of the given block - * @param numReplicas NumberReplicas instance to be initialized with the - * counts of live, corrupt, excess, and - * decommissioned replicas of the given - * block. + * @param containingNodes List to be populated with nodes found to contain + * the given block + * @param nodesContainingLiveReplicas List to be populated with nodes found + * to contain live replicas of the given + * block + * @param numReplicas NumberReplicas instance to be initialized with the + * counts of live, corrupt, excess, and decommissioned + * replicas of the given block. + * @param liveBlockIndices List to be populated with indices of healthy + * blocks in a striped block group * @param priority integer representing replication priority of the given * block - * @return the DatanodeDescriptor of the chosen node from which to replicate - * the given block - */ - @VisibleForTesting - DatanodeDescriptor chooseSourceDatanode(Block block, - List<DatanodeDescriptor> containingNodes, - List<DatanodeStorageInfo> nodesContainingLiveReplicas, - NumberReplicas numReplicas, - int priority) { + * @return the array of DatanodeDescriptor of the chosen nodes from which to + * recover the given block + */ + @VisibleForTesting + DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, + List<DatanodeDescriptor> containingNodes, + List<DatanodeStorageInfo> nodesContainingLiveReplicas, + NumberReplicas numReplicas, + List<Short> liveBlockIndices, int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); - DatanodeDescriptor srcNode = null; + List<DatanodeDescriptor> srcNodes = new ArrayList<>(); int live = 0; int decommissioned = 0; int decommissioning = 0; int corrupt = 0; int excess = 0; - + liveBlockIndices.clear(); + final boolean isStriped = block.isStriped(); + Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block); - for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { + for (DatanodeStorageInfo storage : blocksMap.getStorages(block)) { final DatanodeDescriptor node = storage.getDatanodeDescriptor(); - LightWeightLinkedSet<Block> excessBlocks = + LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get(node.getDatanodeUuid()); - int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; + int countableReplica = storage.getState() == State.NORMAL ? 1 : 0; if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) corrupt += countableReplica; else if (node.isDecommissionInProgress()) { @@ -1683,21 +1839,25 @@ public class BlockManager implements BlockStatsMXBean { if(node.isDecommissioned()) continue; - // We got this far, current node is a reasonable choice - if (srcNode == null) { - srcNode = node; + if(isStriped || srcNodes.isEmpty()) { + srcNodes.add(node); + if (isStriped) { + liveBlockIndices.add((short) ((BlockInfoStriped) block). + getStorageBlockIndex(storage)); + } continue; } - // switch to a different node randomly + // for replicated block, switch to a different node randomly // this to prevent from deterministically selecting the same node even // if the node failed to replicate the block on previous iterations - if(ThreadLocalRandom.current().nextBoolean()) - srcNode = node; + if (!isStriped && ThreadLocalRandom.current().nextBoolean()) { + srcNodes.set(0, node); + } } if(numReplicas != null) numReplicas.initialize(live, decommissioned, decommissioning, corrupt, excess, 0); - return srcNode; + return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]); } /** @@ -1761,25 +1921,41 @@ public class BlockManager implements BlockStatsMXBean { * reported by the datanode in the block report. */ static class StatefulBlockInfo { - final BlockInfoContiguousUnderConstruction storedBlock; + final BlockInfo storedBlock; // should be UC block final Block reportedBlock; final ReplicaState reportedState; - StatefulBlockInfo(BlockInfoContiguousUnderConstruction storedBlock, + StatefulBlockInfo(BlockInfo storedBlock, Block reportedBlock, ReplicaState reportedState) { + Preconditions.checkArgument( + storedBlock instanceof BlockInfoContiguousUnderConstruction || + storedBlock instanceof BlockInfoStripedUnderConstruction); this.storedBlock = storedBlock; this.reportedBlock = reportedBlock; this.reportedState = reportedState; } } - + + private static class BlockInfoToAdd { + final BlockInfo stored; + final Block reported; + + BlockInfoToAdd(BlockInfo stored, Block reported) { + this.stored = stored; + this.reported = reported; + } + } + /** * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a * list of blocks that should be considered corrupt due to a block report. */ private static class BlockToMarkCorrupt { - /** The corrupted block in a datanode. */ - final BlockInfo corrupted; + /** + * The corrupted block in a datanode. This is the one reported by the + * datanode. + */ + final Block corrupted; /** The corresponding block stored in the BlockManager. */ final BlockInfo stored; /** The reason to mark corrupt. */ @@ -1787,8 +1963,7 @@ public class BlockManager implements BlockStatsMXBean { /** The reason code to be stored */ final Reason reasonCode; - BlockToMarkCorrupt(BlockInfo corrupted, - BlockInfo stored, String reason, + BlockToMarkCorrupt(Block corrupted, BlockInfo stored, String reason, Reason reasonCode) { Preconditions.checkNotNull(corrupted, "corrupted is null"); Preconditions.checkNotNull(stored, "stored is null"); @@ -1799,15 +1974,9 @@ public class BlockManager implements BlockStatsMXBean { this.reasonCode = reasonCode; } - BlockToMarkCorrupt(BlockInfo stored, String reason, - Reason reasonCode) { - this(stored, stored, reason, reasonCode); - } - - BlockToMarkCorrupt(BlockInfo stored, long gs, String reason, - Reason reasonCode) { - this(new BlockInfoContiguous((BlockInfoContiguous)stored), stored, - reason, reasonCode); + BlockToMarkCorrupt(Block corrupted, BlockInfo stored, long gs, + String reason, Reason reasonCode) { + this(corrupted, stored, reason, reasonCode); //the corrupted block in datanode has a different generation stamp corrupted.setGenerationStamp(gs); } @@ -1921,8 +2090,8 @@ public class BlockManager implements BlockStatsMXBean { metrics.addBlockReport((int) (endTime - startTime)); } blockLog.info("BLOCK* processReport: from storage {} node {}, " + - "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage - .getStorageID(), nodeID, newReport.getNumberOfBlocks(), + "blocks: {}, hasStaleStorage: {}, processing time: {} msecs", storage + .getStorageID(), nodeID, newReport.getNumberOfBlocks(), node.hasStaleStorages(), (endTime - startTime)); return !node.hasStaleStorages(); } @@ -1944,13 +2113,16 @@ public class BlockManager implements BlockStatsMXBean { // more than one storage on a datanode (and because it's a difficult // assumption to really enforce) removeStoredBlock(block, zombie.getDatanodeDescriptor()); - invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block); + Block b = getBlockOnStorage(block, zombie); + if (b != null) { + invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b); + } } assert(zombie.numBlocks() == 0); LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " + "which no longer exists on the DataNode.", - Long.toHexString(context.getReportId()), prevBlocks, - zombie.getStorageID()); + Long.toHexString(context.getReportId()), prevBlocks, + zombie.getStorageID()); } /** @@ -1993,8 +2165,7 @@ public class BlockManager implements BlockStatsMXBean { if (i >= blocksPerRescan) { break; } - - BlockInfo bi = blocksMap.getStoredBlock(b); + BlockInfo bi = getStoredBlock(b); if (bi == null) { if (LOG.isDebugEnabled()) { LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " + @@ -2034,11 +2205,11 @@ public class BlockManager implements BlockStatsMXBean { // Modify the (block-->datanode) map, according to the difference // between the old and new block report. // - Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>(); - Collection<Block> toRemove = new TreeSet<Block>(); - Collection<Block> toInvalidate = new LinkedList<Block>(); - Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>(); - Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>(); + Collection<BlockInfoToAdd> toAdd = new LinkedList<>(); + Collection<BlockInfo> toRemove = new TreeSet<>(); + Collection<Block> toInvalidate = new LinkedList<>(); + Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>(); + Collection<StatefulBlockInfo> toUC = new LinkedList<>(); reportDiff(storageInfo, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC); @@ -2047,12 +2218,13 @@ public class BlockManager implements BlockStatsMXBean { for (StatefulBlockInfo b : toUC) { addStoredBlockUnderConstruction(b, storageInfo); } - for (Block b : toRemove) { + for (BlockInfo b : toRemove) { removeStoredBlock(b, node); } int numBlocksLogged = 0; - for (BlockInfo b : toAdd) { - addStoredBlock(b, storageInfo, null, numBlocksLogged < maxNumBlocksToLog); + for (BlockInfoToAdd b : toAdd) { + addStoredBlock(b.stored, b.reported, storageInfo, null, + numBlocksLogged < maxNumBlocksToLog); numBlocksLogged++; } if (numBlocksLogged > maxNumBlocksToLog) { @@ -2073,17 +2245,18 @@ public class BlockManager implements BlockStatsMXBean { * Mark block replicas as corrupt except those on the storages in * newStorages list. */ - public void markBlockReplicasAsCorrupt(BlockInfo block, + public void markBlockReplicasAsCorrupt(Block oldBlock, + BlockInfo block, long oldGenerationStamp, long oldNumBytes, DatanodeStorageInfo[] newStorages) throws IOException { assert namesystem.hasWriteLock(); BlockToMarkCorrupt b = null; if (block.getGenerationStamp() != oldGenerationStamp) { - b = new BlockToMarkCorrupt(block, oldGenerationStamp, + b = new BlockToMarkCorrupt(oldBlock, block, oldGenerationStamp, "genstamp does not match " + oldGenerationStamp + " : " + block.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); } else if (block.getNumBytes() != oldNumBytes) { - b = new BlockToMarkCorrupt(block, + b = new BlockToMarkCorrupt(oldBlock, block, "length does not match " + oldNumBytes + " : " + block.getNumBytes(), Reason.SIZE_MISMATCH); } else { @@ -2141,8 +2314,8 @@ public class BlockManager implements BlockStatsMXBean { QUEUE_REASON_FUTURE_GENSTAMP); continue; } - - BlockInfo storedBlock = blocksMap.getStoredBlock(iblk); + + BlockInfo storedBlock = getStoredBlock(iblk); // If block does not belong to any file, we are done. if (storedBlock == null) continue; @@ -2165,38 +2338,38 @@ public class BlockManager implements BlockStatsMXBean { // If block is under construction, add this replica to its list if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - ((BlockInfoContiguousUnderConstruction)storedBlock) - .addReplicaIfNotPresent(storageInfo, iblk, reportedState); + final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)storedBlock; + uc.addReplicaIfNotPresent(storageInfo, iblk, reportedState); // OpenFileBlocks only inside snapshots also will be added to safemode // threshold. So we need to update such blocks to safemode // refer HDFS-5283 - BlockInfoContiguousUnderConstruction blockUC = - (BlockInfoContiguousUnderConstruction) storedBlock; - if (namesystem.isInSnapshot(blockUC)) { - int numOfReplicas = blockUC.getNumExpectedLocations(); - namesystem.incrementSafeBlockCount(numOfReplicas); + if (namesystem.isInSnapshot(storedBlock.getBlockCollection())) { + int numOfReplicas = uc.getNumExpectedLocations(); + namesystem.incrementSafeBlockCount(numOfReplicas, storedBlock); } //and fall through to next clause } //add replica if appropriate if (reportedState == ReplicaState.FINALIZED) { - addStoredBlockImmediate(storedBlock, storageInfo); + addStoredBlockImmediate(storedBlock, iblk, storageInfo); } } } private void reportDiff(DatanodeStorageInfo storageInfo, - BlockListAsLongs newReport, - Collection<BlockInfo> toAdd, // add to DatanodeDescriptor - Collection<Block> toRemove, // remove from DatanodeDescriptor + BlockListAsLongs newReport, + Collection<BlockInfoToAdd> toAdd, // add to DatanodeDescriptor + Collection<BlockInfo> toRemove, // remove from DatanodeDescriptor Collection<Block> toInvalidate, // should be removed from DN Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list Collection<StatefulBlockInfo> toUC) { // add to under-construction list // place a delimiter in the list which separates blocks // that have been reported from those that have not - BlockInfo delimiter = new BlockInfoContiguous(new Block(), (short) 1); - AddBlockResult result = storageInfo.addBlock(delimiter); + Block delimiterBlock = new Block(); + BlockInfo delimiter = new BlockInfoContiguous(delimiterBlock, + (short) 1); + AddBlockResult result = storageInfo.addBlock(delimiter, delimiterBlock); assert result == AddBlockResult.ADDED : "Delimiting block cannot be present in the node"; int headIndex = 0; //currently the delimiter is in the head of the list @@ -2222,8 +2395,9 @@ public class BlockManager implements BlockStatsMXBean { // all of them are next to the delimiter Iterator<BlockInfo> it = storageInfo.new BlockIterator(delimiter.getNext(0)); - while(it.hasNext()) + while(it.hasNext()) { toRemove.add(it.next()); + } storageInfo.removeBlock(delimiter); } @@ -2260,9 +2434,9 @@ public class BlockManager implements BlockStatsMXBean { */ private BlockInfo processReportedBlock( final DatanodeStorageInfo storageInfo, - final Block block, final ReplicaState reportedState, - final Collection<BlockInfo> toAdd, - final Collection<Block> toInvalidate, + final Block block, final ReplicaState reportedState, + final Collection<BlockInfoToAdd> toAdd, + final Collection<Block> toInvalidate, final Collection<BlockToMarkCorrupt> toCorrupt, final Collection<StatefulBlockInfo> toUC) { @@ -2282,7 +2456,7 @@ public class BlockManager implements BlockStatsMXBean { } // find block by blockId - BlockInfo storedBlock = blocksMap.getStoredBlock(block); + BlockInfo storedBlock = getStoredBlock(block); if(storedBlock == null) { // If blocksMap does not contain reported block id, // the replica should be removed from the data-node. @@ -2325,9 +2499,8 @@ public class BlockManager implements BlockStatsMXBean { } if (isBlockUnderConstruction(storedBlock, ucState, reportedState)) { - toUC.add(new StatefulBlockInfo( - (BlockInfoContiguousUnderConstruction) storedBlock, - new Block(block), reportedState)); + toUC.add(new StatefulBlockInfo(storedBlock, new Block(block), + reportedState)); return storedBlock; } @@ -2336,7 +2509,7 @@ public class BlockManager implements BlockStatsMXBean { if (reportedState == ReplicaState.FINALIZED && (storedBlock.findStorageInfo(storageInfo) == -1 || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { - toAdd.add(storedBlock); + toAdd.add(new BlockInfoToAdd(storedBlock, block)); } return storedBlock; } @@ -2382,7 +2555,7 @@ public class BlockManager implements BlockStatsMXBean { if (rbi.getReportedState() == null) { // This is a DELETE_BLOCK request DatanodeStorageInfo storageInfo = rbi.getStorageInfo(); - removeStoredBlock(rbi.getBlock(), + removeStoredBlock(getStoredBlock(rbi.getBlock()), storageInfo.getDatanodeDescriptor()); } else { processAndHandleReportedBlock(rbi.getStorageInfo(), @@ -2430,12 +2603,27 @@ public class BlockManager implements BlockStatsMXBean { case COMMITTED: if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, + return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS, "block is " + ucState + " and reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); - } else if (storedBlock.getNumBytes() != reported.getNumBytes()) { - return new BlockToMarkCorrupt(storedBlock, + } + boolean wrongSize; + if (storedBlock.isStriped()) { + assert BlockIdManager.isStripedBlockID(reported.getBlockId()); + assert storedBlock.getBlockId() == + BlockIdManager.convertToStripedID(reported.getBlockId()); + BlockInfoStriped stripedBlock = (BlockInfoStriped) storedBlock; + int reportedBlkIdx = BlockIdManager.getBlockIndex(reported); + wrongSize = reported.getNumBytes() != + getInternalBlockLength(stripedBlock.getNumBytes(), + BLOCK_STRIPED_CELL_SIZE, + stripedBlock.getDataBlockNum(), reportedBlkIdx); + } else { + wrongSize = storedBlock.getNumBytes() != reported.getNumBytes(); + } + if (wrongSize) { + return new BlockToMarkCorrupt(new Block(reported), storedBlock, "block is " + ucState + " and reported length " + reported.getNumBytes() + " does not match " + "length in block map " + storedBlock.getNumBytes(), @@ -2446,8 +2634,8 @@ public class BlockManager implements BlockStatsMXBean { case UNDER_CONSTRUCTION: if (storedBlock.getGenerationStamp() > reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, "block is " - + ucState + " and reported state " + reportedState + return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS, + "block is " + ucState + " and reported state " + reportedState + ", But reported genstamp " + reportedGS + " does not match genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); @@ -2462,7 +2650,7 @@ public class BlockManager implements BlockStatsMXBean { return null; // not corrupt } else if (storedBlock.getGenerationStamp() != reported.getGenerationStamp()) { final long reportedGS = reported.getGenerationStamp(); - return new BlockToMarkCorrupt(storedBlock, reportedGS, + return new BlockToMarkCorrupt(new Block(reported), storedBlock, reportedGS, "reported " + reportedState + " replica with genstamp " + reportedGS + " does not match COMPLETE block's genstamp in block map " + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH); @@ -2477,7 +2665,7 @@ public class BlockManager implements BlockStatsMXBean { "complete with the same genstamp"); return null; } else { - return new BlockToMarkCorrupt(storedBlock, + return new BlockToMarkCorrupt(new Block(reported), storedBlock, "reported replica has invalid state " + reportedState, Reason.INVALID_STATE); } @@ -2490,7 +2678,8 @@ public class BlockManager implements BlockStatsMXBean { " on " + dn + " size " + storedBlock.getNumBytes(); // log here at WARN level since this is really a broken HDFS invariant LOG.warn(msg); - return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE); + return new BlockToMarkCorrupt(new Block(reported), storedBlock, msg, + Reason.INVALID_STATE); } } @@ -2517,13 +2706,14 @@ public class BlockManager implements BlockStatsMXBean { void addStoredBlockUnderConstruction(StatefulBlockInfo ucBlock, DatanodeStorageInfo storageInfo) throws IOException { - BlockInfoContiguousUnderConstruction block = ucBlock.storedBlock; - block.addReplicaIfNotPresent( - storageInfo, ucBlock.reportedBlock, ucBlock.reportedState); + BlockInfo block = ucBlock.storedBlock; + final BlockInfoUnderConstruction uc = (BlockInfoUnderConstruction)block; + uc.addReplicaIfNotPresent(storageInfo, ucBlock.reportedBlock, + ucBlock.reportedState); if (ucBlock.reportedState == ReplicaState.FINALIZED && (block.findStorageInfo(storageInfo) < 0)) { - addStoredBlock(block, storageInfo, null, true); + addStoredBlock(block, ucBlock.reportedBlock, storageInfo, null, true); } } @@ -2538,39 +2728,40 @@ public class BlockManager implements BlockStatsMXBean { * * @throws IOException */ - private void addStoredBlockImmediate(BlockInfo storedBlock, + private void addStoredBlockImmediate(BlockInfo storedBlock, Block reported, DatanodeStorageInfo storageInfo) throws IOException { assert (storedBlock != null && namesystem.hasWriteLock()); if (!namesystem.isInStartupSafeMode() || namesystem.isPopulatingReplQueues()) { - addStoredBlock(storedBlock, storageInfo, null, false); + addStoredBlock(storedBlock, reported, storageInfo, null, false); return; } // just add it - AddBlockResult result = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock, reported); // Now check for completion of blocks and safe block count int numCurrentReplica = countLiveNodes(storedBlock); if (storedBlock.getBlockUCState() == BlockUCState.COMMITTED - && numCurrentReplica >= minReplication) { + && hasMinStorage(storedBlock, numCurrentReplica)) { completeBlock(storedBlock.getBlockCollection(), storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block // only complete blocks are counted towards that. // In the case that the block just became complete above, completeBlock() // handles the safe block count maintenance. - namesystem.incrementSafeBlockCount(numCurrentReplica); + namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock); } } /** * Modify (block-->datanode) map. Remove block from set of * needed replications if this takes care of the problem. - * @return the block that is stored in blockMap. + * @return the block that is stored in blocksMap. */ private Block addStoredBlock(final BlockInfo block, + final Block reportedBlock, DatanodeStorageInfo storageInfo, DatanodeDescriptor delNodeHint, boolean logEveryBlock) @@ -2578,9 +2769,10 @@ public class BlockManager implements BlockStatsMXBean { assert block != null && namesystem.hasWriteLock(); BlockInfo storedBlock; DatanodeDescriptor node = storageInfo.getDatanodeDescriptor(); - if (block instanceof BlockInfoContiguousUnderConstruction) { + if (block instanceof BlockInfoContiguousUnderConstruction || + block instanceof BlockInfoStripedUnderConstruction) { //refresh our copy in case the block got completed in another thread - storedBlock = blocksMap.getStoredBlock(block); + storedBlock = getStoredBlock(block); } else { storedBlock = block; } @@ -2594,10 +2786,9 @@ public class BlockManager implements BlockStatsMXBean { return block; } BlockCollection bc = storedBlock.getBlockCollection(); - assert bc != null : "Block must belong to a file"; // add block to the datanode - AddBlockResult result = storageInfo.addBlock(storedBlock); + AddBlockResult result = storageInfo.addBlock(storedBlock, reportedBlock); int curReplicaDelta; if (result == AddBlockResult.ADDED) { @@ -2628,7 +2819,7 @@ public class BlockManager implements BlockStatsMXBean { + pendingReplications.getNumReplicas(storedBlock); if(storedBlock.getBlockUCState() == BlockUCState.COMMITTED && - numLiveReplicas >= minReplication) { + hasMinStorage(storedBlock, numLiveReplicas)) { storedBlock = completeBlock(bc, storedBlock, false); } else if (storedBlock.isComplete() && result == AddBlockResult.ADDED) { // check whether safe replication is reached for the block @@ -2636,7 +2827,7 @@ public class BlockManager implements BlockStatsMXBean { // Is no-op if not in safe mode. // In the case that the block just became complete above, completeBlock() // handles the safe block count maintenance. - namesystem.incrementSafeBlockCount(numCurrentReplica); + namesystem.incrementSafeBlockCount(numCurrentReplica, storedBlock); } // if file is under construction, then done for now @@ -2650,7 +2841,7 @@ public class BlockManager implements BlockStatsMXBean { } // handle underReplication/overReplication - short fileReplication = bc.getPreferredBlockReplication(); + short fileReplication = getExpectedReplicaNum(bc, storedBlock); if (!isNeededReplication(storedBlock, fileReplication, numCurrentReplica)) { neededReplications.remove(storedBlock, numCurrentReplica, num.decommissionedAndDecommissioning(), fileReplication); @@ -2666,11 +2857,12 @@ public class BlockManager implements BlockStatsMXBean { int numCorruptNodes = num.corruptReplicas(); if (numCorruptNodes != corruptReplicasCount) { LOG.warn("Inconsistent number of corrupt replicas for " + - storedBlock + "blockMap has " + numCorruptNodes + + storedBlock + ". blockMap has " + numCorruptNodes + " but corrupt replicas map has " + corruptReplicasCount); } - if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) - invalidateCorruptReplicas(storedBlock); + if ((corruptReplicasCount > 0) && (numLiveReplicas >= fileReplication)) { + invalidateCorruptReplicas(storedBlock, reportedBlock, num); + } return storedBlock; } @@ -2702,18 +2894,20 @@ public class BlockManager implements BlockStatsMXBean { * * @param blk Block whose corrupt replicas need to be invalidated */ - private void invalidateCorruptReplicas(BlockInfo blk) { + private void invalidateCorruptReplicas(BlockInfo blk, Block reported, + NumberReplicas numberReplicas) { Collection<DatanodeDescriptor> nodes = corruptReplicas.getNodes(blk); boolean removedFromBlocksMap = true; if (nodes == null) return; // make a copy of the array of nodes in order to avoid // ConcurrentModificationException, when the block is removed from the node - DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]); + DatanodeDescriptor[] nodesCopy = + nodes.toArray(new DatanodeDescriptor[nodes.size()]); for (DatanodeDescriptor node : nodesCopy) { try { - if (!invalidateBlock(new BlockToMarkCorrupt(blk, null, - Reason.ANY), node)) { + if (!invalidateBlock(new BlockToMarkCorrupt(reported, blk, null, + Reason.ANY), node, numberReplicas)) { removedFromBlocksMap = false; } } catch (IOException e) { @@ -2864,6 +3058,15 @@ public class BlockManager implements BlockStatsMXBean { } /** + * Get the value of whether there are any non-EC blocks using StripedID. + * + * @return Returns the value of whether there are any non-EC blocks using StripedID. + */ + public boolean hasNonEcBlockUsingStripedID(){ + return hasNonEcBlockUsingStripedID; + } + + /** * Process a single possibly misreplicated block. This adds it to the * appropriate queues if necessary, and returns a result code indicating * what happened with it. @@ -2881,7 +3084,7 @@ public class BlockManager implements BlockStatsMXBean { } // calculate current replication short expectedReplication = - block.getBlockCollection().getPreferredBlockReplication(); + getExpectedReplicaNum(block.getBlockCollection(), block); NumberReplicas num = countNodes(block); int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be @@ -2940,14 +3143,14 @@ public class BlockManager implements BlockStatsMXBean { * If there are any extras, call chooseExcessReplicates() to * mark them in the excessReplicateMap. */ - private void processOverReplicatedBlock(final Block block, + private void processOverReplicatedBlock(final BlockInfo block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { assert namesystem.hasWriteLock(); if (addedNode == delNodeHint) { delNodeHint = null; } - Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>(); + Collection<DatanodeStorageInfo> nonExcess = new ArrayList<>(); Collection<DatanodeDescriptor> corruptNodes = corruptReplicas .getNodes(block); for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) { @@ -2961,8 +3164,8 @@ public class BlockManager implements BlockStatsMXBean { postponeBlock(block); return; } - LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur - .getDatanodeUuid()); + LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get( + cur.getDatanodeUuid()); if (excessBlocks == null || !excessBlocks.contains(block)) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { // exclude corrupt replicas @@ -2972,10 +3175,29 @@ public class BlockManager implements BlockStatsMXBean { } } } - chooseExcessReplicates(nonExcess, block, replication, - addedNode, delNodeHint, blockplacement); + chooseExcessReplicates(nonExcess, block, replication, addedNode, + delNodeHint); } + private void chooseExcessReplicates( + final Collection<DatanodeStorageInfo> nonExcess, + BlockInfo storedBlock, short replication, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint) { + assert namesystem.hasWriteLock(); + // first form a rack to datanodes map and + BlockCollection bc = getBlockCollection(storedBlock); + if (storedBlock.isStriped()) { + chooseExcessReplicasStriped(bc, nonExcess, storedBlock, delNodeHint); + } else { + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); + final List<StorageType> excessTypes = storagePolicy.chooseExcess( + replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); + chooseExcessReplicasContiguous(bc, nonExcess, storedBlock, + replication, addedNode, delNodeHint, excessTypes); + } + } /** * We want "replication" replicates for the block, but we now have too many. @@ -2991,23 +3213,16 @@ public class BlockManager implements BlockStatsMXBean { * If no such a node is available, * then pick a node with least free space */ - private void chooseExcessReplicates(final Collection<DatanodeStorageInfo> nonExcess, - Block b, short replication, - DatanodeDescriptor addedNode, - DatanodeDescriptor delNodeHint, - BlockPlacementPolicy replicator) { - assert namesystem.hasWriteLock(); - // first form a rack to datanodes map and - BlockCollection bc = getBlockCollection(b); - final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID()); - final List<StorageType> excessTypes = storagePolicy.chooseExcess( - replication, DatanodeStorageInfo.toStorageTypes(nonExcess)); - - - final Map<String, List<DatanodeStorageInfo>> rackMap - = new HashMap<String, List<DatanodeStorageInfo>>(); - final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>(); - final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>(); + private void chooseExcessReplicasContiguous(BlockCollection bc, + final Collection<DatanodeStorageInfo> nonExcess, + BlockInfo storedBlock, short replication, + DatanodeDescriptor addedNode, + DatanodeDescriptor delNodeHint, + List<StorageType> excessTypes) { + BlockPlacementPolicy replicator = placementPolicies.getPolicy(false); + final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>(); + final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>(); + final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>(); // split nodes into two sets // moreThanOne contains nodes on rack with more than one replica @@ -3028,33 +3243,111 @@ public class BlockManager implements BlockStatsMXBean { moreThanOne, excessTypes)) { cur = delNodeHintStorage; } else { // regular excessive replica removal - cur = replicator.chooseReplicaToDelete(bc, b, replication, + cur = replicator.chooseReplicaToDelete(bc, storedBlock, replication, moreThanOne, exactlyOne, excessTypes); } firstOne = false; - // adjust rackmap, moreThanOne, and exactlyOne replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur); - nonExcess.remove(cur); - addToExcessReplicate(cur.getDatanodeDescriptor(), b); + processChosenExcessReplica(nonExcess, cur, storedBlock); + } + } - // - // The 'excessblocks' tracks blocks until we get confirmation - // that the datanode has deleted them; the only way we remove them - // is when we get a "removeBlock" message. - // - // The 'invalidate' list is used to inform the datanode the block - // should be deleted. Items are removed from the invalidate list - // upon giving instructions to the namenode. - // - addToInvalidates(b, cur.getDatanodeDescriptor()); - blockLog.debug("BLOCK* chooseExcessReplicates: " - +"({}, {}) is added to invalidated blocks set", cur, b); + /** + * We want block group has every internal block, but we have redundant + * internal blocks (which have the same index). + * In this method, we delete the redundant internal blocks until only one + * left for each index. + * + * The block placement policy will make sure that the left internal blocks are + * spread across racks and also try hard to pick one with least free space. + */ + private void chooseExcessReplicasStriped(BlockCollection bc, + final Collection<DatanodeStorageInfo> nonExcess, + BlockInfo storedBlock, + DatanodeDescriptor delNodeHint) { + assert storedBlock instanceof BlockInfoStriped; + BlockInfoStriped sblk = (BlockInfoStriped) storedBlock; + short groupSize = sblk.getTotalBlockNum(); + BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(true); + List<DatanodeStorageInfo> empty = new ArrayList<>(0); + + // find all duplicated indices + BitSet found = new BitSet(groupSize); //indices found + BitSet duplicated = new BitSet(groupSize); //indices found more than once + HashMap<DatanodeStorageInfo, Integer> storage2index = new HashMap<>(); + for (DatanodeStorageInfo storage : nonExcess) { + int index = sblk.getStorageBlockIndex(storage); + assert index >= 0; + if (found.get(index)) { + duplicated.set(index); + } + found.set(index); + storage2index.put(storage, index); + } + // the number of target left replicas equals to the of number of the found + // indices. + int numOfTarget = found.cardinality(); + + final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy( + bc.getStoragePolicyID()); + final List<StorageType> excessTypes = storagePolicy.chooseExcess( + (short)numOfTarget, DatanodeStorageInfo.toStorageTypes(nonExcess)); + + // use delHint only if delHint is duplicated + final DatanodeStorageInfo delStorageHint = + DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint); + if (delStorageHint != null) { + Integer index = storage2index.get(delStorageHint); + if (index != null && duplicated.get(index)) { + processChosenExcessReplica(nonExcess, delStorageHint, storedBlock); + } + } + + // for each duplicated index, delete some replicas until only one left + for (int targetIndex = duplicated.nextSetBit(0); targetIndex >= 0; + targetIndex = duplicated.nextSetBit(targetIndex + 1)) { + List<DatanodeStorageInfo> candidates = new ArrayList<>(); + for (DatanodeStorageInfo storage : nonExcess) { + int index = storage2index.get(storage); + if (index == targetIndex) { + candidates.add(storage); + } + } + Block internalBlock = new Block(storedBlock); + internalBlock.setBlockId(storedBlock.getBlockId() + targetIndex); + while (candidates.size() > 1) { + DatanodeStorageInfo target = placementPolicy.chooseReplicaToDelete(bc, + internalBlock, (short)1, candidates, empty, excessTypes); + processChosenExcessReplica(nonExcess, target, storedBlock); + candidates.remove(target); + } + duplicated.clear(targetIndex); } } + private void processChosenExcessReplica( + final Collection<DatanodeStorageInfo> nonExcess, + final DatanodeStorageInfo chosen, BlockInfo storedBlock) { + nonExcess.remove(chosen); + addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock); + // + // The 'excessblocks' tracks blocks until we get confirmation + // that the datanode has deleted them; the only way we remove them + // is when we get a "removeBlock" message. + // + // The 'invalidate' list is used to inform the datanode the block + // should be deleted. Items are removed from the invalidate list + // upon giving instructions to the datanodes. + // + final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen); + addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor()); + blockLog.debug("BLOCK* chooseExcessReplicates: " + + "({}, {}) is added to invalidated blocks set", chosen, storedBlock); + } + /** Check if we can use delHint */ static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, @@ -3076,17 +3369,18 @@ public class BlockManager implements BlockStatsMXBean { } } - private void addToExcessReplicate(DatanodeInfo dn, Block block) { + private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) { assert namesystem.hasWriteLock(); - LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getDatanodeUuid()); + LightWeightLinkedSet<BlockInfo> excessBlocks = excessReplicateMap.get( + dn.getDatanodeUuid()); if (excessBlocks == null) { - excessBlocks = new LightWeightLinkedSet<Block>(); + excessBlocks = new LightWeightLinkedSet<>(); excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks); } - if (excessBlocks.add(block)) { + if (excessBlocks.add(storedBlock)) { excessBlocksCount.incrementAndGet(); blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to" - + " excessReplicateMap", dn, block); + + " excessReplicateMap", dn, storedBlock); } } @@ -3098,26 +3392,25 @@ public class BlockManager implements BlockStatsMXBean { QUEUE_REASON_FUTURE_GENSTAMP); return; } - removeStoredBlock(block, node); + removeStoredBlock(getStoredBlock(block), node); } /** * Modify (block-->datanode) map. Possibly generate replication tasks, if the * removed block is still valid. */ - public void removeStoredBlock(Block block, DatanodeDescriptor node) { - blockLog.debug("BLOCK* removeStoredBlock: {} from {}", block, node); + public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { + blockLog.debug("BLOCK* removeStoredBlock: {} from {}", storedBlock, node); assert (namesystem.hasWriteLock()); { - BlockInfo storedBlock = getStoredBlock(block); if (storedBlock == null || !blocksMap.removeNode(storedBlock, node)) { blockLog.debug("BLOCK* removeStoredBlock: {} has already been" + - " removed from node {}", block, node); + " removed from node {}", storedBlock, node); return; } CachedBlock cblock = namesystem.getCacheManager().getCachedBlocks() - .get(new CachedBlock(block.getBlockId(), (short) 0, false)); + .get(new CachedBlock(storedBlock.getBlockId(), (short) 0, false)); if (cblock != null) { boolean removed = false; removed |= node.getPendingCached().remove(cblock); @@ -3125,7 +3418,7 @@ public class BlockManager implements BlockStatsMXBean { removed |= node.getPendingUncached().remove(cblock); if (removed) { blockLog.debug("BLOCK* removeStoredBlock: {} removed from caching " - + "related lists on node {}", block, node); + + "related lists on node {}", storedBlock, node); } } @@ -3135,7 +3428,7 @@ public class BlockManager implements BlockStatsMXBean { // necessary. In that case, put block on a possibly-will- // be-replicated list. // - BlockCollection bc = blocksMap.getBlockCollection(block); + BlockCollection bc = storedBlock.getBlockCollection(); if (bc != null) { namesystem.decrementS
<TRUNCATED>