HDFS-9857. Erasure Coding: Rename replication-based names in BlockManager to more generic [part-1]. Contributed by Rakesh R.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/32d043d9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/32d043d9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/32d043d9 Branch: refs/heads/HDFS-7240 Commit: 32d043d9c5f4615058ea4f65a58ba271ba47fcb5 Parents: 605fdcb Author: Zhe Zhang <zezh...@zezhang-ld1.linkedin.biz> Authored: Wed Mar 16 16:53:58 2016 -0700 Committer: Zhe Zhang <zezh...@zezhang-ld1.linkedin.biz> Committed: Wed Mar 16 16:53:58 2016 -0700 ---------------------------------------------------------------------- .../server/blockmanagement/BlockManager.java | 268 +++++------ .../blockmanagement/DecommissionManager.java | 30 +- .../blockmanagement/LowRedundancyBlocks.java | 458 +++++++++++++++++++ .../blockmanagement/UnderReplicatedBlocks.java | 448 ------------------ .../blockmanagement/BlockManagerTestUtil.java | 2 +- .../blockmanagement/TestBlockManager.java | 20 +- .../TestLowRedundancyBlockQueues.java | 182 ++++++++ .../blockmanagement/TestPendingReplication.java | 14 +- .../blockmanagement/TestReplicationPolicy.java | 158 +++---- .../TestUnderReplicatedBlockQueues.java | 182 -------- .../hdfs/server/namenode/TestMetaSave.java | 2 +- 11 files changed, 891 insertions(+), 873 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 6ed102c..66ab789 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -149,7 +149,7 @@ public class BlockManager implements BlockStatsMXBean { private volatile long pendingReplicationBlocksCount = 0L; private volatile long corruptReplicaBlocksCount = 0L; - private volatile long underReplicatedBlocksCount = 0L; + private volatile long lowRedundancyBlocksCount = 0L; private volatile long scheduledReplicationBlocksCount = 0L; /** flag indicating whether replication queues have been initialized */ @@ -166,7 +166,7 @@ public class BlockManager implements BlockStatsMXBean { } /** Used by metrics */ public long getUnderReplicatedBlocksCount() { - return underReplicatedBlocksCount; + return lowRedundancyBlocksCount; } /** Used by metrics */ public long getCorruptReplicaBlocksCount() { @@ -250,9 +250,10 @@ public class BlockManager implements BlockStatsMXBean { /** * Store set of Blocks that need to be replicated 1 or more times. - * We also store pending replication-orders. + * We also store pending reconstruction-orders. */ - public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks(); + public final LowRedundancyBlocks neededReconstruction = + new LowRedundancyBlocks(); @VisibleForTesting final PendingReplicationBlocks pendingReplications; @@ -294,20 +295,20 @@ public class BlockManager implements BlockStatsMXBean { private boolean shouldPostponeBlocksFromFuture = false; /** - * Process replication queues asynchronously to allow namenode safemode exit - * and failover to be faster. HDFS-5496 + * Process reconstruction queues asynchronously to allow namenode safemode + * exit and failover to be faster. HDFS-5496. */ - private Daemon replicationQueuesInitializer = null; + private Daemon reconstructionQueuesInitializer = null; /** - * Number of blocks to process asychronously for replication queues + * Number of blocks to process asychronously for reconstruction queues * initialization once aquired the namesystem lock. Remaining blocks will be * processed again after aquiring lock again. */ private int numBlocksPerIteration; /** - * Progress of the Replication queues initialisation. + * Progress of the Reconstruction queues initialisation. */ - private double replicationQueuesInitProgress = 0.0; + private double reconstructionQueuesInitProgress = 0.0; /** for block replicas placement */ private BlockPlacementPolicies placementPolicies; @@ -576,12 +577,12 @@ public class BlockManager implements BlockStatsMXBean { out.println("Live Datanodes: " + live.size()); out.println("Dead Datanodes: " + dead.size()); // - // Dump contents of neededReplication + // Dump contents of neededReconstruction // - synchronized (neededReplications) { - out.println("Metasave: Blocks waiting for replication: " + - neededReplications.size()); - for (Block block : neededReplications) { + synchronized (neededReconstruction) { + out.println("Metasave: Blocks waiting for reconstruction: " + + neededReconstruction.size()); + for (Block block : neededReconstruction) { dumpBlockMeta(block, out); } } @@ -616,7 +617,7 @@ public class BlockManager implements BlockStatsMXBean { // source node returned is not used chooseSourceDatanodes(getStoredBlock(block), containingNodes, containingLiveReplicasNodes, numReplicas, - new LinkedList<Byte>(), UnderReplicatedBlocks.LEVEL); + new LinkedList<Byte>(), LowRedundancyBlocks.LEVEL); // containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are // not included in the numReplicas.liveReplicas() count @@ -849,9 +850,9 @@ public class BlockManager implements BlockStatsMXBean { // is happening bc.convertLastBlockToUC(lastBlock, targets); - // Remove block from replication queue. + // Remove block from reconstruction queue. NumberReplicas replicas = countNodes(lastBlock); - neededReplications.remove(lastBlock, replicas.liveReplicas(), + neededReconstruction.remove(lastBlock, replicas.liveReplicas(), replicas.readOnlyReplicas(), replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); pendingReplications.remove(lastBlock); @@ -1365,8 +1366,8 @@ public class BlockManager implements BlockStatsMXBean { // the block is over-replicated so invalidate the replicas immediately invalidateBlock(b, node, numberOfReplicas); } else if (isPopulatingReplQueues()) { - // add the block to neededReplication - updateNeededReplications(b.getStored(), -1, 0); + // add the block to neededReconstruction + updateNeededReconstructions(b.getStored(), -1, 0); } } @@ -1418,13 +1419,13 @@ public class BlockManager implements BlockStatsMXBean { void updateState() { pendingReplicationBlocksCount = pendingReplications.size(); - underReplicatedBlocksCount = neededReplications.size(); + lowRedundancyBlocksCount = neededReconstruction.size(); corruptReplicaBlocksCount = corruptReplicas.size(); } - /** Return number of under-replicated but not missing blocks */ + /** Return number of low redundancy blocks but not missing blocks. */ public int getUnderReplicatedNotMissingBlocks() { - return neededReplications.getUnderReplicatedBlockCount(); + return neededReconstruction.getLowRedundancyBlockCount(); } /** @@ -1452,25 +1453,26 @@ public class BlockManager implements BlockStatsMXBean { } /** - * Scan blocks in {@link #neededReplications} and assign reconstruction + * Scan blocks in {@link #neededReconstruction} and assign reconstruction * (replication or erasure coding) work to data-nodes they belong to. * * The number of process blocks equals either twice the number of live - * data-nodes or the number of under-replicated blocks whichever is less. + * data-nodes or the number of low redundancy blocks whichever is less. * - * @return number of blocks scheduled for replication during this iteration. + * @return number of blocks scheduled for reconstruction during this + * iteration. */ int computeBlockReconstructionWork(int blocksToProcess) { - List<List<BlockInfo>> blocksToReplicate = null; + List<List<BlockInfo>> blocksToReconstruct = null; namesystem.writeLock(); try { - // Choose the blocks to be replicated - blocksToReplicate = neededReplications - .chooseUnderReplicatedBlocks(blocksToProcess); + // Choose the blocks to be reconstructed + blocksToReconstruct = neededReconstruction + .chooseLowRedundancyBlocks(blocksToProcess); } finally { namesystem.writeUnlock(); } - return computeReconstructionWorkForBlocks(blocksToReplicate); + return computeReconstructionWorkForBlocks(blocksToReconstruct); } /** @@ -1489,7 +1491,7 @@ public class BlockManager implements BlockStatsMXBean { // Step 1: categorize at-risk blocks into replication and EC tasks namesystem.writeLock(); try { - synchronized (neededReplications) { + synchronized (neededReconstruction) { for (int priority = 0; priority < blocksToReconstruct .size(); priority++) { for (BlockInfo block : blocksToReconstruct.get(priority)) { @@ -1533,7 +1535,7 @@ public class BlockManager implements BlockStatsMXBean { continue; } - synchronized (neededReplications) { + synchronized (neededReconstruction) { if (validateReconstructionWork(rw)) { scheduledWork++; } @@ -1544,7 +1546,7 @@ public class BlockManager implements BlockStatsMXBean { } if (blockLog.isDebugEnabled()) { - // log which blocks have been scheduled for replication + // log which blocks have been scheduled for reconstruction for(BlockReconstructionWork rw : reconWork){ DatanodeStorageInfo[] targets = rw.getTargets(); if (targets != null && targets.length != 0) { @@ -1558,8 +1560,9 @@ public class BlockManager implements BlockStatsMXBean { } } - blockLog.debug("BLOCK* neededReplications = {} pendingReplications = {}", - neededReplications.size(), pendingReplications.size()); + blockLog.debug( + "BLOCK* neededReconstruction = {} pendingReplications = {}", + neededReconstruction.size(), pendingReplications.size()); } return scheduledWork; @@ -1576,8 +1579,8 @@ public class BlockManager implements BlockStatsMXBean { int priority) { // skip abandoned block or block reopened for append if (block.isDeleted() || !block.isCompleteOrCommitted()) { - // remove from neededReplications - neededReplications.remove(block, priority); + // remove from neededReconstruction + neededReconstruction.remove(block, priority); return null; } @@ -1605,8 +1608,8 @@ public class BlockManager implements BlockStatsMXBean { int pendingNum = pendingReplications.getNumReplicas(block); if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, requiredReplication)) { - neededReplications.remove(block, priority); - blockLog.debug("BLOCK* Removing {} from neededReplications as" + + neededReconstruction.remove(block, priority); + blockLog.debug("BLOCK* Removing {} from neededReconstruction as" + " it has enough replicas", block); return null; } @@ -1662,7 +1665,7 @@ public class BlockManager implements BlockStatsMXBean { // Recheck since global lock was released // skip abandoned block or block reopened for append if (block.isDeleted() || !block.isCompleteOrCommitted()) { - neededReplications.remove(block, priority); + neededReconstruction.remove(block, priority); rw.resetTargets(); return false; } @@ -1673,7 +1676,7 @@ public class BlockManager implements BlockStatsMXBean { final int pendingNum = pendingReplications.getNumReplicas(block); if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum, requiredReplication)) { - neededReplications.remove(block, priority); + neededReconstruction.remove(block, priority); rw.resetTargets(); blockLog.debug("BLOCK* Removing {} from neededReplications as" + " it has enough replicas", block); @@ -1705,9 +1708,9 @@ public class BlockManager implements BlockStatsMXBean { + "pendingReplications", block); int numEffectiveReplicas = numReplicas.liveReplicas() + pendingNum; - // remove from neededReplications + // remove from neededReconstruction if(numEffectiveReplicas + targets.length >= requiredReplication) { - neededReplications.remove(block, priority); + neededReconstruction.remove(block, priority); } return true; } @@ -1852,7 +1855,7 @@ public class BlockManager implements BlockStatsMXBean { continue; } - if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY + if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY && !node.isDecommissionInProgress() && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { continue; // already reached replication limit @@ -1905,9 +1908,10 @@ public class BlockManager implements BlockStatsMXBean { continue; } NumberReplicas num = countNodes(timedOutItems[i]); - if (isNeededReplication(bi, num.liveReplicas())) { - neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(), - num.decommissionedAndDecommissioning(), getReplication(bi)); + if (isNeededReconstruction(bi, num.liveReplicas())) { + neededReconstruction.add(bi, num.liveReplicas(), + num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), + getReplication(bi)); } } } finally { @@ -2777,7 +2781,7 @@ public class BlockManager implements BlockStatsMXBean { * intended for use with initial block report at startup. If not in startup * safe mode, will call standard addStoredBlock(). Assumes this method is * called "immediately" so there is no need to refresh the storedBlock from - * blocksMap. Doesn't handle underReplication/overReplication, or worry about + * blocksMap. Doesn't handle low redundancy/extra redundancy, or worry about * pendingReplications or corruptReplicas, because it's in startup safe mode. * Doesn't log every block, because there are typically millions of them. * @@ -2812,7 +2816,7 @@ public class BlockManager implements BlockStatsMXBean { /** * Modify (block-->datanode) map. Remove block from set of - * needed replications if this takes care of the problem. + * needed reconstruction if this takes care of the problem. * @return the block that is stored in blocksMap. */ private Block addStoredBlock(final BlockInfo block, @@ -2890,24 +2894,25 @@ public class BlockManager implements BlockStatsMXBean { return storedBlock; } - // do not try to handle over/under-replicated blocks during first safe mode + // do not try to handle extra/low redundancy blocks during first safe mode if (!isPopulatingReplQueues()) { return storedBlock; } - // handle underReplication/overReplication + // handle low redundancy/extra redundancy short fileReplication = getExpectedReplicaNum(storedBlock); - if (!isNeededReplication(storedBlock, numCurrentReplica)) { - neededReplications.remove(storedBlock, numCurrentReplica, + if (!isNeededReconstruction(storedBlock, numCurrentReplica)) { + neededReconstruction.remove(storedBlock, numCurrentReplica, num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), fileReplication); } else { - updateNeededReplications(storedBlock, curReplicaDelta, 0); + updateNeededReconstructions(storedBlock, curReplicaDelta, 0); } - if (shouldProcessOverReplicated(num, fileReplication)) { - processOverReplicatedBlock(storedBlock, fileReplication, node, delNodeHint); + if (shouldProcessExtraRedundancy(num, fileReplication)) { + processExtraRedundancyBlock(storedBlock, fileReplication, node, + delNodeHint); } - // If the file replication has reached desired value + // If the file redundancy has reached desired value // we can remove any corrupt replicas the block may have int corruptReplicasCount = corruptReplicas.numCorruptReplicas(storedBlock); int numCorruptNodes = num.corruptReplicas(); @@ -2922,7 +2927,7 @@ public class BlockManager implements BlockStatsMXBean { return storedBlock; } - private boolean shouldProcessOverReplicated(NumberReplicas num, + private boolean shouldProcessExtraRedundancy(NumberReplicas num, int expectedNum) { final int numCurrent = num.liveReplicas(); return numCurrent > expectedNum || @@ -2972,42 +2977,44 @@ public class BlockManager implements BlockStatsMXBean { /** * For each block in the name-node verify whether it belongs to any file, - * over or under replicated. Place it into the respective queue. + * extra or low redundancy. Place it into the respective queue. */ public void processMisReplicatedBlocks() { assert namesystem.hasWriteLock(); - stopReplicationInitializer(); - neededReplications.clear(); - replicationQueuesInitializer = new Daemon() { + stopReconstructionInitializer(); + neededReconstruction.clear(); + reconstructionQueuesInitializer = new Daemon() { @Override public void run() { try { processMisReplicatesAsync(); } catch (InterruptedException ie) { - LOG.info("Interrupted while processing replication queues."); + LOG.info("Interrupted while processing reconstruction queues."); } catch (Exception e) { - LOG.error("Error while processing replication queues async", e); + LOG.error("Error while processing reconstruction queues async", e); } } }; - replicationQueuesInitializer.setName("Replication Queue Initializer"); - replicationQueuesInitializer.start(); + reconstructionQueuesInitializer + .setName("Reconstruction Queue Initializer"); + reconstructionQueuesInitializer.start(); } /* - * Stop the ongoing initialisation of replication queues + * Stop the ongoing initialisation of reconstruction queues */ - private void stopReplicationInitializer() { - if (replicationQueuesInitializer != null) { - replicationQueuesInitializer.interrupt(); + private void stopReconstructionInitializer() { + if (reconstructionQueuesInitializer != null) { + reconstructionQueuesInitializer.interrupt(); try { - replicationQueuesInitializer.join(); + reconstructionQueuesInitializer.join(); } catch (final InterruptedException e) { - LOG.warn("Interrupted while waiting for replicationQueueInitializer. Returning.."); + LOG.warn("Interrupted while waiting for " + + "reconstructionQueueInitializer. Returning.."); return; } finally { - replicationQueuesInitializer = null; + reconstructionQueuesInitializer = null; } } } @@ -3025,7 +3032,7 @@ public class BlockManager implements BlockStatsMXBean { long startTimeMisReplicatedScan = Time.monotonicNow(); Iterator<BlockInfo> blocksItr = blocksMap.getBlocks().iterator(); long totalBlocks = blocksMap.size(); - replicationQueuesInitProgress = 0; + reconstructionQueuesInitProgress = 0; long totalProcessed = 0; long sleepDuration = Math.max(1, Math.min(numBlocksPerIteration/1000, 10000)); @@ -3067,7 +3074,7 @@ public class BlockManager implements BlockStatsMXBean { totalProcessed += processed; // there is a possibility that if any of the blocks deleted/added during // initialisation, then progress might be different. - replicationQueuesInitProgress = Math.min((double) totalProcessed + reconstructionQueuesInitProgress = Math.min((double) totalProcessed / totalBlocks, 1.0); if (!blocksItr.hasNext()) { @@ -3097,12 +3104,12 @@ public class BlockManager implements BlockStatsMXBean { } /** - * Get the progress of the Replication queues initialisation + * Get the progress of the reconstruction queues initialisation * * @return Returns values between 0 and 1 for the progress. */ - public double getReplicationQueuesInitProgress() { - return replicationQueuesInitProgress; + public double getReconstructionQueuesInitProgress() { + return reconstructionQueuesInitProgress; } /** @@ -3134,15 +3141,16 @@ public class BlockManager implements BlockStatsMXBean { short expectedReplication = getExpectedReplicaNum(block); NumberReplicas num = countNodes(block); final int numCurrentReplica = num.liveReplicas(); - // add to under-replicated queue if need to be - if (isNeededReplication(block, numCurrentReplica)) { - if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(), - num.decommissionedAndDecommissioning(), expectedReplication)) { + // add to low redundancy queue if need to be + if (isNeededReconstruction(block, numCurrentReplica)) { + if (neededReconstruction.add(block, numCurrentReplica, + num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), + expectedReplication)) { return MisReplicationResult.UNDER_REPLICATED; } } - if (shouldProcessOverReplicated(num, expectedReplication)) { + if (shouldProcessExtraRedundancy(num, expectedReplication)) { if (num.replicasOnStaleNodes() > 0) { // If any of the replicas of this block are on nodes that are // considered "stale", then these replicas may in fact have @@ -3152,8 +3160,8 @@ public class BlockManager implements BlockStatsMXBean { return MisReplicationResult.POSTPONE; } - // over-replicated block - processOverReplicatedBlock(block, expectedReplication, null, null); + // extra redundancy block + processExtraRedundancyBlock(block, expectedReplication, null, null); return MisReplicationResult.OVER_REPLICATED; } @@ -3167,12 +3175,12 @@ public class BlockManager implements BlockStatsMXBean { return; } - // update needReplication priority queues + // update neededReconstruction priority queues b.setReplication(newRepl); - updateNeededReplications(b, 0, newRepl - oldRepl); + updateNeededReconstructions(b, 0, newRepl - oldRepl); if (oldRepl > newRepl) { - processOverReplicatedBlock(b, newRepl, null, null); + processExtraRedundancyBlock(b, newRepl, null, null); } } @@ -3181,7 +3189,7 @@ public class BlockManager implements BlockStatsMXBean { * If there are any extras, call chooseExcessReplicates() to * mark them in the excessReplicateMap. */ - private void processOverReplicatedBlock(final BlockInfo block, + private void processExtraRedundancyBlock(final BlockInfo block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) { assert namesystem.hasWriteLock(); @@ -3405,7 +3413,7 @@ public class BlockManager implements BlockStatsMXBean { // if (!storedBlock.isDeleted()) { bmSafeMode.decrementSafeBlockCount(storedBlock); - updateNeededReplications(storedBlock, -1, 0); + updateNeededReconstructions(storedBlock, -1, 0); } excessReplicas.remove(node, storedBlock); @@ -3748,29 +3756,29 @@ public class BlockManager implements BlockStatsMXBean { /** * On stopping decommission, check if the node has excess replicas. - * If there are any excess replicas, call processOverReplicatedBlock(). - * Process over replicated blocks only when active NN is out of safe mode. + * If there are any excess replicas, call processExtraRedundancyBlock(). + * Process extra redundancy blocks only when active NN is out of safe mode. */ - void processOverReplicatedBlocksOnReCommission( + void processExtraRedundancyBlocksOnReCommission( final DatanodeDescriptor srcNode) { if (!isPopulatingReplQueues()) { return; } final Iterator<BlockInfo> it = srcNode.getBlockIterator(); - int numOverReplicated = 0; + int numExtraRedundancy = 0; while(it.hasNext()) { final BlockInfo block = it.next(); int expectedReplication = this.getReplication(block); NumberReplicas num = countNodes(block); - if (shouldProcessOverReplicated(num, expectedReplication)) { - // over-replicated block - processOverReplicatedBlock(block, (short) expectedReplication, null, + if (shouldProcessExtraRedundancy(num, expectedReplication)) { + // extra redundancy block + processExtraRedundancyBlock(block, (short) expectedReplication, null, null); - numOverReplicated++; + numExtraRedundancy++; } } - LOG.info("Invalidated " + numOverReplicated + " over-replicated blocks on " + - srcNode + " during recommissioning"); + LOG.info("Invalidated " + numExtraRedundancy + + " extra redundancy blocks on " + srcNode + " during recommissioning"); } /** @@ -3789,9 +3797,9 @@ public class BlockManager implements BlockStatsMXBean { updateState(); if (pendingReplicationBlocksCount == 0 && - underReplicatedBlocksCount == 0) { - LOG.info("Node {} is dead and there are no under-replicated" + - " blocks or blocks pending replication. Safe to decommission.", + lowRedundancyBlocksCount == 0) { + LOG.info("Node {} is dead and there are no low redundancy" + + " blocks or blocks pending reconstruction. Safe to decommission.", node); return true; } @@ -3835,9 +3843,9 @@ public class BlockManager implements BlockStatsMXBean { block.setNumBytes(BlockCommand.NO_ACK); addToInvalidates(block); removeBlockFromMap(block); - // Remove the block from pendingReplications and neededReplications + // Remove the block from pendingReplications and neededReconstruction pendingReplications.remove(block); - neededReplications.remove(block, UnderReplicatedBlocks.LEVEL); + neededReconstruction.remove(block, LowRedundancyBlocks.LEVEL); if (postponedMisreplicatedBlocks.remove(block)) { postponedMisreplicatedBlocksCount.decrementAndGet(); } @@ -3859,8 +3867,8 @@ public class BlockManager implements BlockStatsMXBean { new Block(BlockIdManager.convertToStripedID(block.getBlockId()))); } - /** updates a block in under replication queue */ - private void updateNeededReplications(final BlockInfo block, + /** updates a block in needed reconstruction queue. */ + private void updateNeededReconstructions(final BlockInfo block, final int curReplicasDelta, int expectedReplicasDelta) { namesystem.writeLock(); try { @@ -3869,14 +3877,14 @@ public class BlockManager implements BlockStatsMXBean { } NumberReplicas repl = countNodes(block); int curExpectedReplicas = getReplication(block); - if (isNeededReplication(block, repl.liveReplicas())) { - neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(), - repl.decommissionedAndDecommissioning(), curExpectedReplicas, - curReplicasDelta, expectedReplicasDelta); + if (isNeededReconstruction(block, repl.liveReplicas())) { + neededReconstruction.update(block, repl.liveReplicas(), + repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), + curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); } else { int oldReplicas = repl.liveReplicas()-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(), + neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), oldExpectedReplicas); } } finally { @@ -3885,10 +3893,10 @@ public class BlockManager implements BlockStatsMXBean { } /** - * Check replication of the blocks in the collection. - * If any block is needed replication, insert it into the replication queue. + * Check sufficient redundancy of the blocks in the collection. If any block + * is needed reconstruction, insert it into the reconstruction queue. * Otherwise, if the block is more than the expected replication factor, - * process it as an over replicated block. + * process it as an extra redundancy block. */ public void checkReplication(BlockCollection bc) { for (BlockInfo block : bc.getBlocks()) { @@ -3896,11 +3904,11 @@ public class BlockManager implements BlockStatsMXBean { final NumberReplicas n = countNodes(block); final int pending = pendingReplications.getNumReplicas(block); if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { - neededReplications.add(block, n.liveReplicas() + pending, + neededReconstruction.add(block, n.liveReplicas() + pending, n.readOnlyReplicas(), n.decommissionedAndDecommissioning(), expected); - } else if (shouldProcessOverReplicated(n, expected)) { - processOverReplicatedBlock(block, expected, null, null); + } else if (shouldProcessExtraRedundancy(n, expected)) { + processExtraRedundancyBlock(block, expected, null, null); } } } @@ -3926,7 +3934,7 @@ public class BlockManager implements BlockStatsMXBean { try { // blocks should not be replicated or removed if safe mode is on if (namesystem.isInSafeMode()) { - LOG.debug("In safemode, not computing replication work"); + LOG.debug("In safemode, not computing reconstruction work"); return 0; } try { @@ -3980,10 +3988,10 @@ public class BlockManager implements BlockStatsMXBean { } /** - * A block needs replication if the number of replicas is less than expected - * or if it does not have enough racks. + * A block needs reconstruction if the number of replicas is less than + * expected or if it does not have enough racks. */ - boolean isNeededReplication(BlockInfo storedBlock, int current) { + boolean isNeededReconstruction(BlockInfo storedBlock, int current) { int expected = getExpectedReplicaNum(storedBlock); return storedBlock.isComplete() && (current < expected || !isPlacementPolicySatisfied(storedBlock)); @@ -3997,12 +4005,12 @@ public class BlockManager implements BlockStatsMXBean { public long getMissingBlocksCount() { // not locking - return this.neededReplications.getCorruptBlockSize(); + return this.neededReconstruction.getCorruptBlockSize(); } public long getMissingReplOneBlocksCount() { // not locking - return this.neededReplications.getCorruptReplOneBlockSize(); + return this.neededReconstruction.getCorruptReplOneBlockSize(); } public BlockInfo addBlockCollection(BlockInfo block, @@ -4050,8 +4058,8 @@ public class BlockManager implements BlockStatsMXBean { * Return an iterator over the set of blocks for which there are no replicas. */ public Iterator<BlockInfo> getCorruptReplicaBlockIterator() { - return neededReplications.iterator( - UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS); + return neededReconstruction.iterator( + LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS); } /** @@ -4070,7 +4078,7 @@ public class BlockManager implements BlockStatsMXBean { /** @return the size of UnderReplicatedBlocks */ public int numOfUnderReplicatedBlocks() { - return neededReplications.size(); + return neededReconstruction.size(); } /** @@ -4232,7 +4240,7 @@ public class BlockManager implements BlockStatsMXBean { * this NameNode. */ public void clearQueues() { - neededReplications.clear(); + neededReconstruction.clear(); pendingReplications.clear(); excessReplicas.clear(); invalidateBlocks.clear(); @@ -4298,7 +4306,7 @@ public class BlockManager implements BlockStatsMXBean { } public void shutdown() { - stopReplicationInitializer(); + stopReconstructionInitializer(); blocksMap.close(); MBeans.unregister(mxBeanName); mxBeanName = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 480670a..3b5f103 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -215,10 +215,10 @@ public class DecommissionManager { if (node.isDecommissionInProgress() || node.isDecommissioned()) { // Update DN stats maintained by HeartbeatManager hbManager.stopDecommission(node); - // Over-replicated blocks will be detected and processed when + // extra redundancy blocks will be detected and processed when // the dead node comes back and send in its full block report. if (node.isAlive()) { - blockManager.processOverReplicatedBlocksOnReCommission(node); + blockManager.processExtraRedundancyBlocksOnReCommission(node); } // Remove from tracking in DecommissionManager pendingNodes.remove(node); @@ -513,9 +513,9 @@ public class DecommissionManager { final List<BlockInfo> insufficientList, boolean pruneReliableBlocks) { boolean firstReplicationLog = true; - int underReplicatedBlocks = 0; + int lowRedundancyBlocks = 0; int decommissionOnlyReplicas = 0; - int underReplicatedInOpenFiles = 0; + int lowRedundancyInOpenFiles = 0; while (it.hasNext()) { numBlocksChecked++; final BlockInfo block = it.next(); @@ -537,22 +537,22 @@ public class DecommissionManager { final NumberReplicas num = blockManager.countNodes(block); final int liveReplicas = num.liveReplicas(); - // Schedule under-replicated blocks for replication if not already + // Schedule low redundancy blocks for reconstruction if not already // pending - if (blockManager.isNeededReplication(block, liveReplicas)) { - if (!blockManager.neededReplications.contains(block) && + if (blockManager.isNeededReconstruction(block, liveReplicas)) { + if (!blockManager.neededReconstruction.contains(block) && blockManager.pendingReplications.getNumReplicas(block) == 0 && blockManager.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. - blockManager.neededReplications.add(block, + blockManager.neededReconstruction.add(block, liveReplicas, num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), blockManager.getExpectedReplicaNum(block)); } } - // Even if the block is under-replicated, - // it doesn't block decommission if it's sufficiently replicated + // Even if the block is without sufficient redundancy, + // it doesn't block decommission if has sufficient redundancy if (isSufficient(block, bc, num)) { if (pruneReliableBlocks) { it.remove(); @@ -560,7 +560,7 @@ public class DecommissionManager { continue; } - // We've found an insufficiently replicated block. + // We've found a block without sufficient redundancy. if (insufficientList != null) { insufficientList.add(block); } @@ -571,18 +571,18 @@ public class DecommissionManager { firstReplicationLog = false; } // Update various counts - underReplicatedBlocks++; + lowRedundancyBlocks++; if (bc.isUnderConstruction()) { - underReplicatedInOpenFiles++; + lowRedundancyInOpenFiles++; } if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) { decommissionOnlyReplicas++; } } - datanode.decommissioningStatus.set(underReplicatedBlocks, + datanode.decommissioningStatus.set(lowRedundancyBlocks, decommissionOnlyReplicas, - underReplicatedInOpenFiles); + lowRedundancyInOpenFiles); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java new file mode 100644 index 0000000..de8cf4e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java @@ -0,0 +1,458 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; + +/** + * Keep prioritized queues of low redundant blocks. + * Blocks have redundancy priority, with priority + * {@link #QUEUE_HIGHEST_PRIORITY} indicating the highest priority. + * </p> + * Having a prioritised queue allows the {@link BlockManager} to select + * which blocks to replicate first -it tries to give priority to data + * that is most at risk or considered most valuable. + * + * <p/> + * The policy for choosing which priority to give added blocks + * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}. + * </p> + * <p>The queue order is as follows:</p> + * <ol> + * <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that should be redundant + * first. That is blocks with only one copy, or blocks with zero live + * copies but a copy in a node being decommissioned. These blocks + * are at risk of loss if the disk or server on which they + * remain fails.</li> + * <li>{@link #QUEUE_VERY_LOW_REDUNDANCY}: blocks that are very + * under-replicated compared to their expected values. Currently + * that means the ratio of the ratio of actual:expected means that + * there is <i>less than</i> 1:3.</li>. These blocks may not be at risk, + * but they are clearly considered "important". + * <li>{@link #QUEUE_LOW_REDUNDANCY}: blocks that are also under + * replicated, and the ratio of actual:expected is good enough that + * they do not need to go into the {@link #QUEUE_VERY_LOW_REDUNDANCY} + * queue.</li> + * <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as + * many copies of a block as required, but the blocks are not adequately + * distributed. Loss of a rack/switch could take all copies off-line.</li> + * <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt + * and for which there are no-non-corrupt copies (currently) available. + * The policy here is to keep those corrupt blocks replicated, but give + * blocks that are not corrupt higher priority.</li> + * </ol> + */ +class LowRedundancyBlocks implements Iterable<BlockInfo> { + /** The total number of queues : {@value} */ + static final int LEVEL = 5; + /** The queue with the highest priority: {@value} */ + static final int QUEUE_HIGHEST_PRIORITY = 0; + /** The queue for blocks that are way below their expected value : {@value} */ + static final int QUEUE_VERY_LOW_REDUNDANCY = 1; + /** + * The queue for "normally" without sufficient redundancy blocks : {@value}. + */ + static final int QUEUE_LOW_REDUNDANCY = 2; + /** The queue for blocks that have the right number of replicas, + * but which the block manager felt were badly distributed: {@value} + */ + static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3; + /** The queue for corrupt blocks: {@value} */ + static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; + /** the queues themselves */ + private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues + = new ArrayList<>(LEVEL); + + /** The number of corrupt blocks with replication factor 1 */ + private int corruptReplOneBlocks = 0; + + /** Create an object. */ + LowRedundancyBlocks() { + for (int i = 0; i < LEVEL; i++) { + priorityQueues.add(new LightWeightLinkedSet<BlockInfo>()); + } + } + + /** + * Empty the queues. + */ + synchronized void clear() { + for (int i = 0; i < LEVEL; i++) { + priorityQueues.get(i).clear(); + } + corruptReplOneBlocks = 0; + } + + /** Return the total number of insufficient redundancy blocks. */ + synchronized int size() { + int size = 0; + for (int i = 0; i < LEVEL; i++) { + size += priorityQueues.get(i).size(); + } + return size; + } + + /** + * Return the number of insufficiently redundant blocks excluding corrupt + * blocks. + */ + synchronized int getLowRedundancyBlockCount() { + int size = 0; + for (int i = 0; i < LEVEL; i++) { + if (i != QUEUE_WITH_CORRUPT_BLOCKS) { + size += priorityQueues.get(i).size(); + } + } + return size; + } + + /** Return the number of corrupt blocks */ + synchronized int getCorruptBlockSize() { + return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size(); + } + + /** Return the number of corrupt blocks with replication factor 1 */ + synchronized int getCorruptReplOneBlockSize() { + return corruptReplOneBlocks; + } + + /** Check if a block is in the neededReconstruction queue. */ + synchronized boolean contains(BlockInfo block) { + for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) { + if (set.contains(block)) { + return true; + } + } + return false; + } + + /** Return the priority of a block + * @param curReplicas current number of replicas of the block + * @param expectedReplicas expected number of replicas of the block + * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) + */ + private int getPriority(BlockInfo block, + int curReplicas, + int readOnlyReplicas, + int decommissionedReplicas, + int expectedReplicas) { + assert curReplicas >= 0 : "Negative replicas!"; + if (curReplicas >= expectedReplicas) { + // Block has enough copies, but not enough racks + return QUEUE_REPLICAS_BADLY_DISTRIBUTED; + } + if (block.isStriped()) { + BlockInfoStriped sblk = (BlockInfoStriped) block; + return getPriorityStriped(curReplicas, decommissionedReplicas, + sblk.getRealDataBlockNum(), sblk.getParityBlockNum()); + } else { + return getPriorityContiguous(curReplicas, readOnlyReplicas, + decommissionedReplicas, expectedReplicas); + } + } + + private int getPriorityContiguous(int curReplicas, int readOnlyReplicas, + int decommissionedReplicas, int expectedReplicas) { + if (curReplicas == 0) { + // If there are zero non-decommissioned replicas but there are + // some decommissioned replicas, then assign them highest priority + if (decommissionedReplicas > 0) { + return QUEUE_HIGHEST_PRIORITY; + } + if (readOnlyReplicas > 0) { + // only has read-only replicas, highest risk + // since the read-only replicas may go down all together. + return QUEUE_HIGHEST_PRIORITY; + } + //all we have are corrupt blocks + return QUEUE_WITH_CORRUPT_BLOCKS; + } else if (curReplicas == 1) { + // only one replica, highest risk of loss + // highest priority + return QUEUE_HIGHEST_PRIORITY; + } else if ((curReplicas * 3) < expectedReplicas) { + //can only afford one replica loss + //this is considered very insufficiently redundant blocks. + return QUEUE_VERY_LOW_REDUNDANCY; + } else { + //add to the normal queue for insufficiently redundant blocks + return QUEUE_LOW_REDUNDANCY; + } + } + + private int getPriorityStriped(int curReplicas, int decommissionedReplicas, + short dataBlkNum, short parityBlkNum) { + if (curReplicas < dataBlkNum) { + // There are some replicas on decommissioned nodes so it's not corrupted + if (curReplicas + decommissionedReplicas >= dataBlkNum) { + return QUEUE_HIGHEST_PRIORITY; + } + return QUEUE_WITH_CORRUPT_BLOCKS; + } else if (curReplicas == dataBlkNum) { + // highest risk of loss, highest priority + return QUEUE_HIGHEST_PRIORITY; + } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) { + // can only afford one replica loss + // this is considered very insufficiently redundant blocks. + return QUEUE_VERY_LOW_REDUNDANCY; + } else { + // add to the normal queue for insufficiently redundant blocks. + return QUEUE_LOW_REDUNDANCY; + } + } + + /** + * Add a block to insufficiently redundant queue according to its priority. + * + * @param block a low redundancy block + * @param curReplicas current number of replicas of the block + * @param decomissionedReplicas the number of decommissioned replicas + * @param expectedReplicas expected number of replicas of the block + * @return true if the block was added to a queue. + */ + synchronized boolean add(BlockInfo block, + int curReplicas, + int readOnlyReplicas, + int decomissionedReplicas, + int expectedReplicas) { + assert curReplicas >= 0 : "Negative replicas!"; + final int priLevel = getPriority(block, curReplicas, readOnlyReplicas, + decomissionedReplicas, expectedReplicas); + if(priorityQueues.get(priLevel).add(block)) { + if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && + expectedReplicas == 1) { + corruptReplOneBlocks++; + } + NameNode.blockStateChangeLog.debug( + "BLOCK* NameSystem.LowRedundancyBlock.add: {}" + + " has only {} replicas and need {} replicas so is added to" + + " neededReconstructions at priority level {}", + block, curReplicas, expectedReplicas, priLevel); + + return true; + } + return false; + } + + /** Remove a block from a low redundancy queue. */ + synchronized boolean remove(BlockInfo block, + int oldReplicas, + int oldReadOnlyReplicas, + int decommissionedReplicas, + int oldExpectedReplicas) { + final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas, + decommissionedReplicas, oldExpectedReplicas); + boolean removedBlock = remove(block, priLevel); + if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && + oldExpectedReplicas == 1 && + removedBlock) { + corruptReplOneBlocks--; + assert corruptReplOneBlocks >= 0 : + "Number of corrupt blocks with replication factor 1 " + + "should be non-negative"; + } + return removedBlock; + } + + /** + * Remove a block from the low redundancy queues. + * + * The priLevel parameter is a hint of which queue to query + * first: if negative or >= {@link #LEVEL} this shortcutting + * is not attmpted. + * + * If the block is not found in the nominated queue, an attempt is made to + * remove it from all queues. + * + * <i>Warning:</i> This is not a synchronized method. + * @param block block to remove + * @param priLevel expected privilege level + * @return true if the block was found and removed from one of the priority + * queues + */ + boolean remove(BlockInfo block, int priLevel) { + if(priLevel >= 0 && priLevel < LEVEL + && priorityQueues.get(priLevel).remove(block)) { + NameNode.blockStateChangeLog.debug( + "BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block {}" + + " from priority queue {}", + block, priLevel); + return true; + } else { + // Try to remove the block from all queues if the block was + // not found in the queue for the given priority level. + for (int i = 0; i < LEVEL; i++) { + if (i != priLevel && priorityQueues.get(i).remove(block)) { + NameNode.blockStateChangeLog.debug( + "BLOCK* NameSystem.LowRedundancyBlock.remove: Removing block" + + " {} from priority queue {}", block, i); + return true; + } + } + } + return false; + } + + /** + * Recalculate and potentially update the priority level of a block. + * + * If the block priority has changed from before an attempt is made to + * remove it from the block queue. Regardless of whether or not the block + * is in the block queue of (recalculate) priority, an attempt is made + * to add it to that queue. This ensures that the block will be + * in its expected priority queue (and only that queue) by the end of the + * method call. + * @param block a low redundancy block + * @param curReplicas current number of replicas of the block + * @param decommissionedReplicas the number of decommissioned replicas + * @param curExpectedReplicas expected number of replicas of the block + * @param curReplicasDelta the change in the replicate count from before + * @param expectedReplicasDelta the change in the expected replica count + * from before + */ + synchronized void update(BlockInfo block, int curReplicas, + int readOnlyReplicas, int decommissionedReplicas, + int curExpectedReplicas, + int curReplicasDelta, int expectedReplicasDelta) { + int oldReplicas = curReplicas-curReplicasDelta; + int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; + int curPri = getPriority(block, curReplicas, readOnlyReplicas, + decommissionedReplicas, curExpectedReplicas); + int oldPri = getPriority(block, oldReplicas, readOnlyReplicas, + decommissionedReplicas, oldExpectedReplicas); + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("LowRedundancyBlocks.update " + + block + + " curReplicas " + curReplicas + + " curExpectedReplicas " + curExpectedReplicas + + " oldReplicas " + oldReplicas + + " oldExpectedReplicas " + oldExpectedReplicas + + " curPri " + curPri + + " oldPri " + oldPri); + } + if(oldPri != curPri) { + remove(block, oldPri); + } + if(priorityQueues.get(curPri).add(block)) { + NameNode.blockStateChangeLog.debug( + "BLOCK* NameSystem.LowRedundancyBlock.update: {} has only {} " + + "replicas and needs {} replicas so is added to " + + "neededReconstructions at priority level {}", + block, curReplicas, curExpectedReplicas, curPri); + + } + if (oldPri != curPri || expectedReplicasDelta != 0) { + // corruptReplOneBlocks could possibly change + if (curPri == QUEUE_WITH_CORRUPT_BLOCKS && + curExpectedReplicas == 1) { + // add a new corrupt block with replication factor 1 + corruptReplOneBlocks++; + } else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS && + curExpectedReplicas - expectedReplicasDelta == 1) { + // remove an existing corrupt block with replication factor 1 + corruptReplOneBlocks--; + } + } + } + + /** + * Get a list of block lists without sufficient redundancy. The index of + * block lists represents its replication priority. Iterates each block list + * in priority order beginning with the highest priority list. Iterators use + * a bookmark to resume where the previous iteration stopped. Returns when + * the block count is met or iteration reaches the end of the lowest priority + * list, in which case bookmarks for each block list are reset to the heads + * of their respective lists. + * + * @param blocksToProcess - number of blocks to fetch from low redundancy + * blocks. + * @return Return a list of block lists to be replicated. The block list + * index represents its redundancy priority. + */ + synchronized List<List<BlockInfo>> chooseLowRedundancyBlocks( + int blocksToProcess) { + final List<List<BlockInfo>> blocksToReconstruct = new ArrayList<>(LEVEL); + + int count = 0; + int priority = 0; + for (; count < blocksToProcess && priority < LEVEL; priority++) { + if (priority == QUEUE_WITH_CORRUPT_BLOCKS) { + // do not choose corrupted blocks. + continue; + } + + // Go through all blocks that need reconstructions with current priority. + // Set the iterator to the first unprocessed block at this priority level + final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark(); + final List<BlockInfo> blocks = new LinkedList<>(); + blocksToReconstruct.add(blocks); + // Loop through all remaining blocks in the list. + for(; count < blocksToProcess && i.hasNext(); count++) { + blocks.add(i.next()); + } + } + + if (priority == LEVEL) { + // Reset all bookmarks because there were no recently added blocks. + for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) { + q.resetBookmark(); + } + } + + return blocksToReconstruct; + } + + /** Returns an iterator of all blocks in a given priority queue. */ + synchronized Iterator<BlockInfo> iterator(int level) { + return priorityQueues.get(level).iterator(); + } + + /** Return an iterator of all the low redundancy blocks. */ + @Override + public synchronized Iterator<BlockInfo> iterator() { + final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator(); + return new Iterator<BlockInfo>() { + private Iterator<BlockInfo> b = q.next().iterator(); + + @Override + public BlockInfo next() { + hasNext(); + return b.next(); + } + + @Override + public boolean hasNext() { + for(; !b.hasNext() && q.hasNext(); ) { + b = q.next().iterator(); + } + return b.hasNext(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java deleted file mode 100644 index 5e8f7ed..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ /dev/null @@ -1,448 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.server.blockmanagement; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; - -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; - -/** - * Keep prioritized queues of under replicated blocks. - * Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY} - * indicating the highest priority. - * </p> - * Having a prioritised queue allows the {@link BlockManager} to select - * which blocks to replicate first -it tries to give priority to data - * that is most at risk or considered most valuable. - * - * <p/> - * The policy for choosing which priority to give added blocks - * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}. - * </p> - * <p>The queue order is as follows:</p> - * <ol> - * <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated - * first. That is blocks with only one copy, or blocks with zero live - * copies but a copy in a node being decommissioned. These blocks - * are at risk of loss if the disk or server on which they - * remain fails.</li> - * <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very - * under-replicated compared to their expected values. Currently - * that means the ratio of the ratio of actual:expected means that - * there is <i>less than</i> 1:3.</li>. These blocks may not be at risk, - * but they are clearly considered "important". - * <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under - * replicated, and the ratio of actual:expected is good enough that - * they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED} - * queue.</li> - * <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as - * many copies of a block as required, but the blocks are not adequately - * distributed. Loss of a rack/switch could take all copies off-line.</li> - * <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt - * and for which there are no-non-corrupt copies (currently) available. - * The policy here is to keep those corrupt blocks replicated, but give - * blocks that are not corrupt higher priority.</li> - * </ol> - */ -class UnderReplicatedBlocks implements Iterable<BlockInfo> { - /** The total number of queues : {@value} */ - static final int LEVEL = 5; - /** The queue with the highest priority: {@value} */ - static final int QUEUE_HIGHEST_PRIORITY = 0; - /** The queue for blocks that are way below their expected value : {@value} */ - static final int QUEUE_VERY_UNDER_REPLICATED = 1; - /** The queue for "normally" under-replicated blocks: {@value} */ - static final int QUEUE_UNDER_REPLICATED = 2; - /** The queue for blocks that have the right number of replicas, - * but which the block manager felt were badly distributed: {@value} - */ - static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3; - /** The queue for corrupt blocks: {@value} */ - static final int QUEUE_WITH_CORRUPT_BLOCKS = 4; - /** the queues themselves */ - private final List<LightWeightLinkedSet<BlockInfo>> priorityQueues - = new ArrayList<>(LEVEL); - - /** The number of corrupt blocks with replication factor 1 */ - private int corruptReplOneBlocks = 0; - - /** Create an object. */ - UnderReplicatedBlocks() { - for (int i = 0; i < LEVEL; i++) { - priorityQueues.add(new LightWeightLinkedSet<BlockInfo>()); - } - } - - /** - * Empty the queues. - */ - synchronized void clear() { - for (int i = 0; i < LEVEL; i++) { - priorityQueues.get(i).clear(); - } - corruptReplOneBlocks = 0; - } - - /** Return the total number of under replication blocks */ - synchronized int size() { - int size = 0; - for (int i = 0; i < LEVEL; i++) { - size += priorityQueues.get(i).size(); - } - return size; - } - - /** Return the number of under replication blocks excluding corrupt blocks */ - synchronized int getUnderReplicatedBlockCount() { - int size = 0; - for (int i = 0; i < LEVEL; i++) { - if (i != QUEUE_WITH_CORRUPT_BLOCKS) { - size += priorityQueues.get(i).size(); - } - } - return size; - } - - /** Return the number of corrupt blocks */ - synchronized int getCorruptBlockSize() { - return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size(); - } - - /** Return the number of corrupt blocks with replication factor 1 */ - synchronized int getCorruptReplOneBlockSize() { - return corruptReplOneBlocks; - } - - /** Check if a block is in the neededReplication queue */ - synchronized boolean contains(BlockInfo block) { - for(LightWeightLinkedSet<BlockInfo> set : priorityQueues) { - if (set.contains(block)) { - return true; - } - } - return false; - } - - /** Return the priority of a block - * @param curReplicas current number of replicas of the block - * @param expectedReplicas expected number of replicas of the block - * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1) - */ - private int getPriority(BlockInfo block, - int curReplicas, - int readOnlyReplicas, - int decommissionedReplicas, - int expectedReplicas) { - assert curReplicas >= 0 : "Negative replicas!"; - if (curReplicas >= expectedReplicas) { - // Block has enough copies, but not enough racks - return QUEUE_REPLICAS_BADLY_DISTRIBUTED; - } - if (block.isStriped()) { - BlockInfoStriped sblk = (BlockInfoStriped) block; - return getPriorityStriped(curReplicas, decommissionedReplicas, - sblk.getRealDataBlockNum(), sblk.getParityBlockNum()); - } else { - return getPriorityContiguous(curReplicas, readOnlyReplicas, - decommissionedReplicas, expectedReplicas); - } - } - - private int getPriorityContiguous(int curReplicas, int readOnlyReplicas, - int decommissionedReplicas, int expectedReplicas) { - if (curReplicas == 0) { - // If there are zero non-decommissioned replicas but there are - // some decommissioned replicas, then assign them highest priority - if (decommissionedReplicas > 0) { - return QUEUE_HIGHEST_PRIORITY; - } - if (readOnlyReplicas > 0) { - // only has read-only replicas, highest risk - // since the read-only replicas may go down all together. - return QUEUE_HIGHEST_PRIORITY; - } - //all we have are corrupt blocks - return QUEUE_WITH_CORRUPT_BLOCKS; - } else if (curReplicas == 1) { - // only one replica, highest risk of loss - // highest priority - return QUEUE_HIGHEST_PRIORITY; - } else if ((curReplicas * 3) < expectedReplicas) { - //there is less than a third as many blocks as requested; - //this is considered very under-replicated - return QUEUE_VERY_UNDER_REPLICATED; - } else { - //add to the normal queue for under replicated blocks - return QUEUE_UNDER_REPLICATED; - } - } - - private int getPriorityStriped(int curReplicas, int decommissionedReplicas, - short dataBlkNum, short parityBlkNum) { - if (curReplicas < dataBlkNum) { - // There are some replicas on decommissioned nodes so it's not corrupted - if (curReplicas + decommissionedReplicas >= dataBlkNum) { - return QUEUE_HIGHEST_PRIORITY; - } - return QUEUE_WITH_CORRUPT_BLOCKS; - } else if (curReplicas == dataBlkNum) { - // highest risk of loss, highest priority - return QUEUE_HIGHEST_PRIORITY; - } else if ((curReplicas - dataBlkNum) * 3 < parityBlkNum + 1) { - // there is less than a third as many blocks as requested; - // this is considered very under-replicated - return QUEUE_VERY_UNDER_REPLICATED; - } else { - // add to the normal queue for under replicated blocks - return QUEUE_UNDER_REPLICATED; - } - } - - /** add a block to a under replication queue according to its priority - * @param block a under replication block - * @param curReplicas current number of replicas of the block - * @param decomissionedReplicas the number of decommissioned replicas - * @param expectedReplicas expected number of replicas of the block - * @return true if the block was added to a queue. - */ - synchronized boolean add(BlockInfo block, - int curReplicas, - int readOnlyReplicas, - int decomissionedReplicas, - int expectedReplicas) { - assert curReplicas >= 0 : "Negative replicas!"; - final int priLevel = getPriority(block, curReplicas, readOnlyReplicas, - decomissionedReplicas, expectedReplicas); - if(priorityQueues.get(priLevel).add(block)) { - if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && - expectedReplicas == 1) { - corruptReplOneBlocks++; - } - NameNode.blockStateChangeLog.debug( - "BLOCK* NameSystem.UnderReplicationBlock.add: {}" - + " has only {} replicas and need {} replicas so is added to" + - " neededReplications at priority level {}", block, curReplicas, - expectedReplicas, priLevel); - - return true; - } - return false; - } - - /** remove a block from a under replication queue */ - synchronized boolean remove(BlockInfo block, - int oldReplicas, - int oldReadOnlyReplicas, - int decommissionedReplicas, - int oldExpectedReplicas) { - final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas, - decommissionedReplicas, oldExpectedReplicas); - boolean removedBlock = remove(block, priLevel); - if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && - oldExpectedReplicas == 1 && - removedBlock) { - corruptReplOneBlocks--; - assert corruptReplOneBlocks >= 0 : - "Number of corrupt blocks with replication factor 1 " + - "should be non-negative"; - } - return removedBlock; - } - - /** - * Remove a block from the under replication queues. - * - * The priLevel parameter is a hint of which queue to query - * first: if negative or >= {@link #LEVEL} this shortcutting - * is not attmpted. - * - * If the block is not found in the nominated queue, an attempt is made to - * remove it from all queues. - * - * <i>Warning:</i> This is not a synchronized method. - * @param block block to remove - * @param priLevel expected privilege level - * @return true if the block was found and removed from one of the priority queues - */ - boolean remove(BlockInfo block, int priLevel) { - if(priLevel >= 0 && priLevel < LEVEL - && priorityQueues.get(priLevel).remove(block)) { - NameNode.blockStateChangeLog.debug( - "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block {}" + - " from priority queue {}", block, priLevel); - return true; - } else { - // Try to remove the block from all queues if the block was - // not found in the queue for the given priority level. - for (int i = 0; i < LEVEL; i++) { - if (i != priLevel && priorityQueues.get(i).remove(block)) { - NameNode.blockStateChangeLog.debug( - "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" + - " {} from priority queue {}", block, i); - return true; - } - } - } - return false; - } - - /** - * Recalculate and potentially update the priority level of a block. - * - * If the block priority has changed from before an attempt is made to - * remove it from the block queue. Regardless of whether or not the block - * is in the block queue of (recalculate) priority, an attempt is made - * to add it to that queue. This ensures that the block will be - * in its expected priority queue (and only that queue) by the end of the - * method call. - * @param block a under replicated block - * @param curReplicas current number of replicas of the block - * @param decommissionedReplicas the number of decommissioned replicas - * @param curExpectedReplicas expected number of replicas of the block - * @param curReplicasDelta the change in the replicate count from before - * @param expectedReplicasDelta the change in the expected replica count from before - */ - synchronized void update(BlockInfo block, int curReplicas, - int readOnlyReplicas, int decommissionedReplicas, - int curExpectedReplicas, - int curReplicasDelta, int expectedReplicasDelta) { - int oldReplicas = curReplicas-curReplicasDelta; - int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - int curPri = getPriority(block, curReplicas, readOnlyReplicas, - decommissionedReplicas, curExpectedReplicas); - int oldPri = getPriority(block, oldReplicas, readOnlyReplicas, - decommissionedReplicas, oldExpectedReplicas); - if(NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + - block + - " curReplicas " + curReplicas + - " curExpectedReplicas " + curExpectedReplicas + - " oldReplicas " + oldReplicas + - " oldExpectedReplicas " + oldExpectedReplicas + - " curPri " + curPri + - " oldPri " + oldPri); - } - if(oldPri != curPri) { - remove(block, oldPri); - } - if(priorityQueues.get(curPri).add(block)) { - NameNode.blockStateChangeLog.debug( - "BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " + - "replicas and needs {} replicas so is added to " + - "neededReplications at priority level {}", block, curReplicas, - curExpectedReplicas, curPri); - - } - if (oldPri != curPri || expectedReplicasDelta != 0) { - // corruptReplOneBlocks could possibly change - if (curPri == QUEUE_WITH_CORRUPT_BLOCKS && - curExpectedReplicas == 1) { - // add a new corrupt block with replication factor 1 - corruptReplOneBlocks++; - } else if (oldPri == QUEUE_WITH_CORRUPT_BLOCKS && - curExpectedReplicas - expectedReplicasDelta == 1) { - // remove an existing corrupt block with replication factor 1 - corruptReplOneBlocks--; - } - } - } - - /** - * Get a list of block lists to be replicated. The index of block lists - * represents its replication priority. Iterates each block list in priority - * order beginning with the highest priority list. Iterators use a bookmark to - * resume where the previous iteration stopped. Returns when the block count - * is met or iteration reaches the end of the lowest priority list, in which - * case bookmarks for each block list are reset to the heads of their - * respective lists. - * - * @param blocksToProcess - number of blocks to fetch from underReplicated - * blocks. - * @return Return a list of block lists to be replicated. The block list index - * represents its replication priority. - */ - synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks( - int blocksToProcess) { - final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL); - - int count = 0; - int priority = 0; - for (; count < blocksToProcess && priority < LEVEL; priority++) { - if (priority == QUEUE_WITH_CORRUPT_BLOCKS) { - // do not choose corrupted blocks. - continue; - } - - // Go through all blocks that need replications with current priority. - // Set the iterator to the first unprocessed block at this priority level. - final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark(); - final List<BlockInfo> blocks = new LinkedList<>(); - blocksToReplicate.add(blocks); - // Loop through all remaining blocks in the list. - for(; count < blocksToProcess && i.hasNext(); count++) { - blocks.add(i.next()); - } - } - - if (priority == LEVEL) { - // Reset all bookmarks because there were no recently added blocks. - for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) { - q.resetBookmark(); - } - } - - return blocksToReplicate; - } - - /** returns an iterator of all blocks in a given priority queue */ - synchronized Iterator<BlockInfo> iterator(int level) { - return priorityQueues.get(level).iterator(); - } - - /** return an iterator of all the under replication blocks */ - @Override - public synchronized Iterator<BlockInfo> iterator() { - final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator(); - return new Iterator<BlockInfo>() { - private Iterator<BlockInfo> b = q.next().iterator(); - - @Override - public BlockInfo next() { - hasNext(); - return b.next(); - } - - @Override - public boolean hasNext() { - for(; !b.hasNext() && q.hasNext(); ) { - b = q.next().iterator(); - } - return b.hasNext(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index c0a4fdb..1b565ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -74,7 +74,7 @@ public class BlockManagerTestUtil { final BlockInfo storedBlock = bm.getStoredBlock(b); return new int[]{getNumberOfRacks(bm, b), bm.countNodes(storedBlock).liveReplicas(), - bm.neededReplications.contains(storedBlock) ? 1 : 0}; + bm.neededReconstruction.contains(storedBlock) ? 1 : 0}; } finally { namesystem.readUnlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 5511b99..3a974e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -397,20 +397,20 @@ public class TestBlockManager { addNodes(nodes); List<DatanodeDescriptor> origNodes = nodes.subList(0, 3); for (int i = 0; i < NUM_TEST_ITERS; i++) { - doTestSingleRackClusterIsSufficientlyReplicated(i, origNodes); + doTestSingleRackClusterHasSufficientRedundancy(i, origNodes); } } - private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex, + private void doTestSingleRackClusterHasSufficientRedundancy(int testIndex, List<DatanodeDescriptor> origNodes) throws Exception { assertEquals(0, bm.numOfUnderReplicatedBlocks()); BlockInfo block = addBlockOnNodes(testIndex, origNodes); - assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block))); + assertFalse(bm.isNeededReconstruction(block, bm.countLiveNodes(block))); } @Test(timeout = 60000) - public void testNeededReplicationWhileAppending() throws IOException { + public void testNeededReconstructionWhileAppending() throws IOException { Configuration conf = new HdfsConfiguration(); String src = "/test-file"; Path file = new Path(src); @@ -449,7 +449,7 @@ public class TestBlockManager { namenode.updatePipeline(clientName, oldBlock, newBlock, newLocatedBlock.getLocations(), newLocatedBlock.getStorageIDs()); BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock()); - assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi))); + assertFalse(bm.isNeededReconstruction(bi, bm.countLiveNodes(bi))); } finally { IOUtils.closeStream(out); } @@ -601,7 +601,7 @@ public class TestBlockManager { liveNodes, new NumberReplicas(), new ArrayList<Byte>(), - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY)[0]); + LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY)[0]); assertEquals("Does not choose a source node for a less-than-highest-priority" + " replication since all available source nodes have reached" @@ -612,7 +612,7 @@ public class TestBlockManager { liveNodes, new NumberReplicas(), new ArrayList<Byte>(), - UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).length); + LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY).length); // Increase the replication count to test replication count > hard limit DatanodeStorageInfo targets[] = { origNodes.get(1).getStorageInfos()[0] }; @@ -626,7 +626,7 @@ public class TestBlockManager { liveNodes, new NumberReplicas(), new ArrayList<Byte>(), - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).length); + LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length); } @Test @@ -652,7 +652,7 @@ public class TestBlockManager { cntNodes, liveNodes, new NumberReplicas(), new LinkedList<Byte>(), - UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED)[0]); + LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY)[0]); // Increase the replication count to test replication count > hard limit @@ -666,7 +666,7 @@ public class TestBlockManager { cntNodes, liveNodes, new NumberReplicas(), new LinkedList<Byte>(), - UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).length); + LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY).length); } @Test http://git-wip-us.apache.org/repos/asf/hadoop/blob/32d043d9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java new file mode 100644 index 0000000..2eb7abf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestLowRedundancyBlockQueues.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.Iterator; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; + +public class TestLowRedundancyBlockQueues { + + private final ErasureCodingPolicy ecPolicy = + ErasureCodingPolicyManager.getSystemDefaultPolicy(); + + private BlockInfo genBlockInfo(long id) { + return new BlockInfoContiguous(new Block(id), (short) 3); + } + + private BlockInfo genStripedBlockInfo(long id, long numBytes) { + BlockInfoStriped sblk = new BlockInfoStriped(new Block(id), ecPolicy); + sblk.setNumBytes(numBytes); + return sblk; + } + + /** + * Test that adding blocks with different replication counts puts them + * into different queues + * @throws Throwable if something goes wrong + */ + @Test + public void testBlockPriorities() throws Throwable { + LowRedundancyBlocks queues = new LowRedundancyBlocks(); + BlockInfo block1 = genBlockInfo(1); + BlockInfo block2 = genBlockInfo(2); + BlockInfo block_very_low_redundancy = genBlockInfo(3); + BlockInfo block_corrupt = genBlockInfo(4); + BlockInfo block_corrupt_repl_one = genBlockInfo(5); + + //add a block with a single entry + assertAdded(queues, block1, 1, 0, 3); + + assertEquals(1, queues.getLowRedundancyBlockCount()); + assertEquals(1, queues.size()); + assertInLevel(queues, block1, LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY); + //repeated additions fail + assertFalse(queues.add(block1, 1, 0, 0, 3)); + + //add a second block with two replicas + assertAdded(queues, block2, 2, 0, 3); + assertEquals(2, queues.getLowRedundancyBlockCount()); + assertEquals(2, queues.size()); + assertInLevel(queues, block2, LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY); + //now try to add a block that is corrupt + assertAdded(queues, block_corrupt, 0, 0, 3); + assertEquals(3, queues.size()); + assertEquals(2, queues.getLowRedundancyBlockCount()); + assertEquals(1, queues.getCorruptBlockSize()); + assertInLevel(queues, block_corrupt, + LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS); + + //insert a very insufficiently redundancy block + assertAdded(queues, block_very_low_redundancy, 4, 0, 25); + assertInLevel(queues, block_very_low_redundancy, + LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY); + + //insert a corrupt block with replication factor 1 + assertAdded(queues, block_corrupt_repl_one, 0, 0, 1); + assertEquals(2, queues.getCorruptBlockSize()); + assertEquals(1, queues.getCorruptReplOneBlockSize()); + queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2); + assertEquals(0, queues.getCorruptReplOneBlockSize()); + queues.update(block_corrupt, 0, 0, 0, 1, 0, -2); + assertEquals(1, queues.getCorruptReplOneBlockSize()); + queues.update(block_very_low_redundancy, 0, 0, 0, 1, -4, -24); + assertEquals(2, queues.getCorruptReplOneBlockSize()); + } + + @Test + public void testStripedBlockPriorities() throws Throwable { + int dataBlkNum = ecPolicy.getNumDataUnits(); + int parityBlkNUm = ecPolicy.getNumParityUnits(); + doTestStripedBlockPriorities(1, parityBlkNUm); + doTestStripedBlockPriorities(dataBlkNum, parityBlkNUm); + } + + private void doTestStripedBlockPriorities(int dataBlkNum, int parityBlkNum) + throws Throwable { + int groupSize = dataBlkNum + parityBlkNum; + long numBytes = ecPolicy.getCellSize() * dataBlkNum; + LowRedundancyBlocks queues = new LowRedundancyBlocks(); + int numUR = 0; + int numCorrupt = 0; + + // add low redundancy blocks + for (int i = 0; dataBlkNum + i < groupSize; i++) { + BlockInfo block = genStripedBlockInfo(-100 - 100 * i, numBytes); + assertAdded(queues, block, dataBlkNum + i, 0, groupSize); + numUR++; + assertEquals(numUR, queues.getLowRedundancyBlockCount()); + assertEquals(numUR + numCorrupt, queues.size()); + if (i == 0) { + assertInLevel(queues, block, + LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY); + } else if (i * 3 < parityBlkNum + 1) { + assertInLevel(queues, block, + LowRedundancyBlocks.QUEUE_VERY_LOW_REDUNDANCY); + } else { + assertInLevel(queues, block, + LowRedundancyBlocks.QUEUE_LOW_REDUNDANCY); + } + } + + // add a corrupted block + BlockInfo block_corrupt = genStripedBlockInfo(-10, numBytes); + assertEquals(numCorrupt, queues.getCorruptBlockSize()); + assertAdded(queues, block_corrupt, dataBlkNum - 1, 0, groupSize); + numCorrupt++; + assertEquals(numUR + numCorrupt, queues.size()); + assertEquals(numUR, queues.getLowRedundancyBlockCount()); + assertEquals(numCorrupt, queues.getCorruptBlockSize()); + assertInLevel(queues, block_corrupt, + LowRedundancyBlocks.QUEUE_WITH_CORRUPT_BLOCKS); + } + + private void assertAdded(LowRedundancyBlocks queues, + BlockInfo block, + int curReplicas, + int decomissionedReplicas, + int expectedReplicas) { + assertTrue("Failed to add " + block, + queues.add(block, + curReplicas, 0, + decomissionedReplicas, + expectedReplicas)); + } + + /** + * Determine whether or not a block is in a level without changing the API. + * Instead get the per-level iterator and run though it looking for a match. + * If the block is not found, an assertion is thrown. + * + * This is inefficient, but this is only a test case. + * @param queues queues to scan + * @param block block to look for + * @param level level to select + */ + private void assertInLevel(LowRedundancyBlocks queues, + Block block, + int level) { + final Iterator<BlockInfo> bi = queues.iterator(level); + while (bi.hasNext()) { + Block next = bi.next(); + if (block.equals(next)) { + return; + } + } + fail("Block " + block + " not found in level " + level); + } +}