HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5411dc55 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5411dc55 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5411dc55 Branch: refs/heads/HDFS-7966 Commit: 5411dc559d5f73e4153e76fdff94a26869c17a37 Parents: 63020c5 Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Authored: Thu Oct 15 18:07:09 2015 +0800 Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com> Committed: Thu Oct 15 18:07:09 2015 +0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../server/blockmanagement/BlockManager.java | 35 +++- .../blockmanagement/DecommissionManager.java | 2 +- .../server/blockmanagement/NumberReplicas.java | 18 +- .../blockmanagement/UnderReplicatedBlocks.java | 205 +++++++------------ .../blockmanagement/TestReplicationPolicy.java | 71 +++---- .../TestUnderReplicatedBlockQueues.java | 14 +- 7 files changed, 146 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cddb340..a6dc78f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1531,6 +1531,8 @@ Release 2.8.0 - UNRELEASED HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei) + HDFS-9205. Do not schedule corrupt blocks for replication. (szetszwo) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 4185220..c7dbbd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -785,6 +785,7 @@ public class BlockManager implements BlockStatsMXBean { // Remove block from replication queue. NumberReplicas replicas = countNodes(lastBlock); neededReplications.remove(lastBlock, replicas.liveReplicas(), + replicas.readOnlyReplicas(), replicas.decommissionedAndDecommissioning(), getReplication(lastBlock)); pendingReplications.remove(lastBlock); @@ -1795,6 +1796,7 @@ public class BlockManager implements BlockStatsMXBean { nodesContainingLiveReplicas.clear(); List<DatanodeDescriptor> srcNodes = new ArrayList<>(); int live = 0; + int readonly = 0; int decommissioned = 0; int decommissioning = 0; int corrupt = 0; @@ -1820,6 +1822,9 @@ public class BlockManager implements BlockStatsMXBean { nodesContainingLiveReplicas.add(storage); live += countableReplica; } + if (storage.getState() == State.READ_ONLY_SHARED) { + readonly++; + } containingNodes.add(node); // Check if this replica is corrupt // If so, do not select the node as src node @@ -1858,7 +1863,7 @@ public class BlockManager implements BlockStatsMXBean { } } if(numReplicas != null) - numReplicas.initialize(live, decommissioned, decommissioning, corrupt, + numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt, excess, 0); return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]); } @@ -1883,7 +1888,7 @@ public class BlockManager implements BlockStatsMXBean { } NumberReplicas num = countNodes(timedOutItems[i]); if (isNeededReplication(bi, num.liveReplicas())) { - neededReplications.add(bi, num.liveReplicas(), + neededReplications.add(bi, num.liveReplicas(), num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), getReplication(bi)); } } @@ -2799,6 +2804,7 @@ public class BlockManager implements BlockStatsMXBean { short fileReplication = getExpectedReplicaNum(storedBlock); if (!isNeededReplication(storedBlock, numCurrentReplica)) { neededReplications.remove(storedBlock, numCurrentReplica, + num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), fileReplication); } else { updateNeededReplications(storedBlock, curReplicaDelta, 0); @@ -3043,8 +3049,8 @@ public class BlockManager implements BlockStatsMXBean { int numCurrentReplica = num.liveReplicas(); // add to under-replicated queue if need to be if (isNeededReplication(block, numCurrentReplica)) { - if (neededReplications.add(block, numCurrentReplica, num - .decommissionedAndDecommissioning(), expectedReplication)) { + if (neededReplications.add(block, numCurrentReplica, num.readOnlyReplicas(), + num.decommissionedAndDecommissioning(), expectedReplication)) { return MisReplicationResult.UNDER_REPLICATED; } } @@ -3583,15 +3589,22 @@ public class BlockManager implements BlockStatsMXBean { * For a striped block, this includes nodes storing blocks belonging to the * striped block group. */ - public NumberReplicas countNodes(BlockInfo b) { + public NumberReplicas countNodes(Block b) { int decommissioned = 0; int decommissioning = 0; int live = 0; + int readonly = 0; int corrupt = 0; int excess = 0; int stale = 0; Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b); - for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) { + for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) { + if (storage.getState() == State.FAILED) { + continue; + } else if (storage.getState() == State.READ_ONLY_SHARED) { + readonly++; + continue; + } final DatanodeDescriptor node = storage.getDatanodeDescriptor(); if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) { corrupt++; @@ -3612,7 +3625,8 @@ public class BlockManager implements BlockStatsMXBean { stale++; } } - return new NumberReplicas(live, decommissioned, decommissioning, corrupt, excess, stale); + return new NumberReplicas(live, readonly, decommissioned, decommissioning, + corrupt, excess, stale); } /** @@ -3765,13 +3779,13 @@ public class BlockManager implements BlockStatsMXBean { NumberReplicas repl = countNodes(block); int curExpectedReplicas = getReplication(block); if (isNeededReplication(block, repl.liveReplicas())) { - neededReplications.update(block, repl.liveReplicas(), repl - .decommissionedAndDecommissioning(), curExpectedReplicas, + neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(), + repl.decommissionedAndDecommissioning(), curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); } else { int oldReplicas = repl.liveReplicas()-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - neededReplications.remove(block, oldReplicas, + neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), oldExpectedReplicas); } } finally { @@ -3792,6 +3806,7 @@ public class BlockManager implements BlockStatsMXBean { final int pending = pendingReplications.getNumReplicas(block); if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { neededReplications.add(block, n.liveReplicas() + pending, + n.readOnlyReplicas(), n.decommissionedAndDecommissioning(), expected); } else if (n.liveReplicas() > expected) { processOverReplicatedBlock(block, expected, null, null); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 1f1ae09..42810350 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -545,7 +545,7 @@ public class DecommissionManager { blockManager.isPopulatingReplQueues()) { // Process these blocks only when active NN is out of safe mode. blockManager.neededReplications.add(block, - liveReplicas, + liveReplicas, num.readOnlyReplicas(), num.decommissionedAndDecommissioning(), blockManager.getExpectedReplicaNum(block)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java index e567bbf..44ae6f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; */ public class NumberReplicas { private int liveReplicas; + private int readOnlyReplicas; // Tracks only the decommissioning replicas private int decommissioning; @@ -33,17 +34,18 @@ public class NumberReplicas { private int replicasOnStaleNodes; NumberReplicas() { - initialize(0, 0, 0, 0, 0, 0); + this(0, 0, 0, 0, 0, 0, 0); } - NumberReplicas(int live, int decommissioned, int decommissioning, int corrupt, - int excess, int stale) { - initialize(live, decommissioned, decommissioning, corrupt, excess, stale); + NumberReplicas(int live, int readonly, int decommissioned, + int decommissioning, int corrupt, int excess, int stale) { + set(live, readonly, decommissioned, decommissioning, corrupt, excess, stale); } - void initialize(int live, int decommissioned, int decommissioning, - int corrupt, int excess, int stale) { + void set(int live, int readonly, int decommissioned, int decommissioning, + int corrupt, int excess, int stale) { liveReplicas = live; + readOnlyReplicas = readonly; this.decommissioning = decommissioning; this.decommissioned = decommissioned; corruptReplicas = corrupt; @@ -55,6 +57,10 @@ public class NumberReplicas { return liveReplicas; } + public int readOnlyReplicas() { + return readOnlyReplicas; + } + /** * * @return decommissioned replicas + decommissioning replicas http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java index 7e8f479..d4938c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java @@ -19,9 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; -import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; + import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; /** * Keep prioritized queues of under replicated blocks. @@ -34,7 +36,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * * <p/> * The policy for choosing which priority to give added blocks - * is implemented in {@link #getPriority(BlockInfo, int, int, int)}. + * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}. * </p> * <p>The queue order is as follows:</p> * <ol> @@ -147,6 +149,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { */ private int getPriority(BlockInfo block, int curReplicas, + int readOnlyReplicas, int decommissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; @@ -159,19 +162,24 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { return getPriorityStriped(curReplicas, decommissionedReplicas, sblk.getRealDataBlockNum(), sblk.getParityBlockNum()); } else { - return getPriorityContiguous(curReplicas, decommissionedReplicas, - expectedReplicas); + return getPriorityContiguous(curReplicas, readOnlyReplicas, + decommissionedReplicas, expectedReplicas); } } - private int getPriorityContiguous(int curReplicas, int decommissionedReplicas, - int expectedReplicas) { + private int getPriorityContiguous(int curReplicas, int readOnlyReplicas, + int decommissionedReplicas, int expectedReplicas) { if (curReplicas == 0) { // If there are zero non-decommissioned replicas but there are // some decommissioned replicas, then assign them highest priority if (decommissionedReplicas > 0) { return QUEUE_HIGHEST_PRIORITY; } + if (readOnlyReplicas > 0) { + // only has read-only replicas, highest risk + // since the read-only replicas may go down all together. + return QUEUE_HIGHEST_PRIORITY; + } //all we have are corrupt blocks return QUEUE_WITH_CORRUPT_BLOCKS; } else if (curReplicas == 1) { @@ -218,11 +226,12 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { */ synchronized boolean add(BlockInfo block, int curReplicas, + int readOnlyReplicas, int decomissionedReplicas, int expectedReplicas) { assert curReplicas >= 0 : "Negative replicas!"; - int priLevel = getPriority(block, curReplicas, decomissionedReplicas, - expectedReplicas); + final int priLevel = getPriority(block, curReplicas, readOnlyReplicas, + decomissionedReplicas, expectedReplicas); if(priorityQueues.get(priLevel).add(block)) { if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && expectedReplicas == 1) { @@ -242,11 +251,11 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { /** remove a block from a under replication queue */ synchronized boolean remove(BlockInfo block, int oldReplicas, + int oldReadOnlyReplicas, int decommissionedReplicas, int oldExpectedReplicas) { - int priLevel = getPriority(block, oldReplicas, - decommissionedReplicas, - oldExpectedReplicas); + final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas, + decommissionedReplicas, oldExpectedReplicas); boolean removedBlock = remove(block, priLevel); if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS && oldExpectedReplicas == 1 && @@ -285,10 +294,10 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { // Try to remove the block from all queues if the block was // not found in the queue for the given priority level. for (int i = 0; i < LEVEL; i++) { - if (priorityQueues.get(i).remove(block)) { + if (i != priLevel && priorityQueues.get(i).remove(block)) { NameNode.blockStateChangeLog.debug( "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" + - " {} from priority queue {}", block, priLevel); + " {} from priority queue {}", block, i); return true; } } @@ -313,15 +322,15 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { * @param expectedReplicasDelta the change in the expected replica count from before */ synchronized void update(BlockInfo block, int curReplicas, - int decommissionedReplicas, + int readOnlyReplicas, int decommissionedReplicas, int curExpectedReplicas, int curReplicasDelta, int expectedReplicasDelta) { int oldReplicas = curReplicas-curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; - int curPri = getPriority(block, curReplicas, decommissionedReplicas, - curExpectedReplicas); - int oldPri = getPriority(block, oldReplicas, decommissionedReplicas, - oldExpectedReplicas); + int curPri = getPriority(block, curReplicas, readOnlyReplicas, + decommissionedReplicas, curExpectedReplicas); + int oldPri = getPriority(block, oldReplicas, readOnlyReplicas, + decommissionedReplicas, oldExpectedReplicas); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + block + @@ -371,143 +380,69 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> { * @return Return a list of block lists to be replicated. The block list index * represents its replication priority. */ - public synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks( + synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks( int blocksToProcess) { - // initialize data structure for the return value - List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL); - for (int i = 0; i < LEVEL; i++) { - blocksToReplicate.add(new ArrayList<BlockInfo>()); - } - - if (size() == 0) { // There are no blocks to collect. - return blocksToReplicate; - } + final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL); - int blockCount = 0; - for (int priority = 0; priority < LEVEL; priority++) { + int count = 0; + int priority = 0; + for (; count < blocksToProcess && priority < LEVEL; priority++) { + if (priority == QUEUE_WITH_CORRUPT_BLOCKS) { + // do not choose corrupted blocks. + continue; + } + // Go through all blocks that need replications with current priority. - BlockIterator neededReplicationsIterator = iterator(priority); // Set the iterator to the first unprocessed block at this priority level. - neededReplicationsIterator.setToBookmark(); - - blocksToProcess = Math.min(blocksToProcess, size()); - - if (blockCount == blocksToProcess) { - break; // break if already expected blocks are obtained - } - + final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark(); + final List<BlockInfo> blocks = new LinkedList<>(); + blocksToReplicate.add(blocks); // Loop through all remaining blocks in the list. - while (blockCount < blocksToProcess - && neededReplicationsIterator.hasNext()) { - BlockInfo block = neededReplicationsIterator.next(); - blocksToReplicate.get(priority).add(block); - blockCount++; + for(; count < blocksToProcess && i.hasNext(); count++) { + blocks.add(i.next()); } - - if (!neededReplicationsIterator.hasNext() - && neededReplicationsIterator.getPriority() == LEVEL - 1) { - // Reset all priorities' bookmarks to the beginning because there were - // no recently added blocks in any list. - for (int i = 0; i < LEVEL; i++) { - this.priorityQueues.get(i).resetBookmark(); - } - break; + } + + if (priority == LEVEL) { + // Reset all bookmarks because there were no recently added blocks. + for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) { + q.resetBookmark(); } } + return blocksToReplicate; } /** returns an iterator of all blocks in a given priority queue */ - synchronized BlockIterator iterator(int level) { - return new BlockIterator(level); + synchronized Iterator<BlockInfo> iterator(int level) { + return priorityQueues.get(level).iterator(); } /** return an iterator of all the under replication blocks */ @Override - public synchronized BlockIterator iterator() { - return new BlockIterator(); - } - - /** - * An iterator over blocks. - */ - class BlockIterator implements Iterator<BlockInfo> { - private int level; - private boolean isIteratorForLevel = false; - private final List<Iterator<BlockInfo>> iterators = new ArrayList<>(); - - /** - * Construct an iterator over all queues. - */ - private BlockIterator() { - level=0; - for(int i=0; i<LEVEL; i++) { - iterators.add(priorityQueues.get(i).iterator()); - } - } - - /** - * Constrict an iterator for a single queue level - * @param l the priority level to iterate over - */ - private BlockIterator(int l) { - level = l; - isIteratorForLevel = true; - iterators.add(priorityQueues.get(level).iterator()); - } - - private void update() { - if (isIteratorForLevel) { - return; - } - while(level< LEVEL-1 && !iterators.get(level).hasNext()) { - level++; - } - } - - @Override - public BlockInfo next() { - if (isIteratorForLevel) { - return iterators.get(0).next(); + public synchronized Iterator<BlockInfo> iterator() { + final Iterator<LightWeightLinkedSet<BlockInfo>> q = priorityQueues.iterator(); + return new Iterator<BlockInfo>() { + private Iterator<BlockInfo> b = q.next().iterator(); + + @Override + public BlockInfo next() { + hasNext(); + return b.next(); } - update(); - return iterators.get(level).next(); - } - @Override - public boolean hasNext() { - if (isIteratorForLevel) { - return iterators.get(0).hasNext(); - } - update(); - return iterators.get(level).hasNext(); - } - - @Override - public void remove() { - if (isIteratorForLevel) { - iterators.get(0).remove(); - } else { - iterators.get(level).remove(); + @Override + public boolean hasNext() { + for(; !b.hasNext() && q.hasNext(); ) { + b = q.next().iterator(); + } + return b.hasNext(); } - } - - int getPriority() { - return level; - } - /** - * Sets iterator(s) to bookmarked elements. - */ - private synchronized void setToBookmark() { - if (this.isIteratorForLevel) { - this.iterators.set(0, priorityQueues.get(this.level) - .getBookmark()); - } else { - for(int i=0; i<LEVEL; i++) { - this.iterators.set(i, priorityQueues.get(i).getBookmark()); - } + @Override + public void remove() { + throw new UnsupportedOperationException(); } - } + }; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index fa9cc5c..a0adc60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -836,7 +836,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding the blocks directly to normal priority neededReplications.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 2, 0, 3); + nextLong()), 2, 0, 0, 3); } // Lets wait for the replication interval, to start process normal // priority blocks @@ -844,7 +844,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding the block directly to high priority list neededReplications.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 1, 0, 3); + nextLong()), 1, 0, 0, 3); // Lets wait for the replication interval Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL); @@ -868,23 +868,23 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { for (int i = 0; i < 5; i++) { // Adding QUEUE_HIGHEST_PRIORITY block underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 1, 0, 3); + nextLong()), 1, 0, 0, 3); // Adding QUEUE_VERY_UNDER_REPLICATED block underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 2, 0, 7); + nextLong()), 2, 0, 0, 7); // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 6, 0, 6); + nextLong()), 6, 0, 0, 6); // Adding QUEUE_UNDER_REPLICATED block underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 5, 0, 6); + nextLong()), 5, 0, 0, 6); // Adding QUEUE_WITH_CORRUPT_BLOCKS block underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 0, 0, 3); + nextLong()), 0, 0, 0, 3); } // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks @@ -902,13 +902,12 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding QUEUE_HIGHEST_PRIORITY underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current(). - nextLong()), 1, 0, 3); + nextLong()), 0, 1, 0, 3); // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED - // and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS. chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10); - assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5); + assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4); // Since it is reached to end of all lists, // should start picking the blocks from start. @@ -920,29 +919,15 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { /** asserts the chosen blocks with expected priority blocks */ private void assertTheChosenBlocks( - List<List<BlockInfo>> chosenBlocks, int firstPrioritySize, - int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize, - int fifthPrioritySize) { - assertEquals( - "Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks", - firstPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size()); - assertEquals( - "Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks", - secondPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size()); - assertEquals( - "Not returned the expected number of QUEUE_UNDER_REPLICATED blocks", - thirdPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size()); - assertEquals( - "Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks", - fourthPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size()); - assertEquals( - "Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks", - fifthPrioritySize, chosenBlocks.get( - UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size()); + List<List<BlockInfo>> chosenBlocks, int... expectedSizes) { + int i = 0; + for(; i < chosenBlocks.size(); i++) { + assertEquals("Not returned the expected number for i=" + i, + expectedSizes[i], chosenBlocks.get(i).size()); + } + for(; i < expectedSizes.length; i++) { + assertEquals("Expected size is non-zero for i=" + i, 0, expectedSizes[i]); + } } /** @@ -1101,14 +1086,14 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Adding QUEUE_VERY_UNDER_REPLICATED block final int block1CurReplicas = 2; final int block1ExpectedReplicas = 7; - underReplicatedBlocks.add(block1, block1CurReplicas, 0, + underReplicatedBlocks.add(block1, block1CurReplicas, 0, 0, block1ExpectedReplicas); // Adding QUEUE_VERY_UNDER_REPLICATED block - underReplicatedBlocks.add(block2, 2, 0, 7); + underReplicatedBlocks.add(block2, 2, 0, 0, 7); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block3, 2, 0, 6); + underReplicatedBlocks.add(block3, 2, 0, 0, 6); List<List<BlockInfo>> chosenBlocks; @@ -1119,7 +1104,7 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { // Increasing the replications will move the block down a // priority. This simulates a replica being completed in between checks. - underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, + underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, 0, block1ExpectedReplicas, 1, 0); // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block @@ -1147,10 +1132,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block1, 0, 1, 1); + underReplicatedBlocks.add(block1, 0, 0, 1, 1); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block2, 0, 1, 1); + underReplicatedBlocks.add(block2, 0, 0, 1, 1); List<List<BlockInfo>> chosenBlocks; @@ -1205,10 +1190,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { BlockInfo block2 = genBlockInfo(blkID2); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block1, 0, 1, 1); + underReplicatedBlocks.add(block1, 0, 0, 1, 1); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block2, 0, 1, 1); + underReplicatedBlocks.add(block2, 0, 0, 1, 1); List<List<BlockInfo>> chosenBlocks; @@ -1268,10 +1253,10 @@ public class TestReplicationPolicy extends BaseReplicationPolicyTest { BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong()); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block1, 0, 1, 1); + underReplicatedBlocks.add(block1, 0, 0, 1, 1); // Adding QUEUE_UNDER_REPLICATED block - underReplicatedBlocks.add(block2, 0, 1, 1); + underReplicatedBlocks.add(block2, 0, 0, 1, 1); List<List<BlockInfo>> chosenBlocks; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java index 7cd2e19..3ad45df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.Iterator; + import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -64,7 +66,7 @@ public class TestUnderReplicatedBlockQueues { assertEquals(1, queues.size()); assertInLevel(queues, block1, UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY); //repeated additions fail - assertFalse(queues.add(block1, 1, 0, 3)); + assertFalse(queues.add(block1, 1, 0, 0, 3)); //add a second block with two replicas assertAdded(queues, block2, 2, 0, 3); @@ -88,11 +90,11 @@ public class TestUnderReplicatedBlockQueues { assertAdded(queues, block_corrupt_repl_one, 0, 0, 1); assertEquals(2, queues.getCorruptBlockSize()); assertEquals(1, queues.getCorruptReplOneBlockSize()); - queues.update(block_corrupt_repl_one, 0, 0, 3, 0, 2); + queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2); assertEquals(0, queues.getCorruptReplOneBlockSize()); - queues.update(block_corrupt, 0, 0, 1, 0, -2); + queues.update(block_corrupt, 0, 0, 0, 1, 0, -2); assertEquals(1, queues.getCorruptReplOneBlockSize()); - queues.update(block_very_under_replicated, 0, 0, 1, -4, -24); + queues.update(block_very_under_replicated, 0, 0, 0, 1, -4, -24); assertEquals(2, queues.getCorruptReplOneBlockSize()); } @@ -151,7 +153,7 @@ public class TestUnderReplicatedBlockQueues { int expectedReplicas) { assertTrue("Failed to add " + block, queues.add(block, - curReplicas, + curReplicas, 0, decomissionedReplicas, expectedReplicas)); } @@ -169,7 +171,7 @@ public class TestUnderReplicatedBlockQueues { private void assertInLevel(UnderReplicatedBlocks queues, Block block, int level) { - UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level); + final Iterator<BlockInfo> bi = queues.iterator(level); while (bi.hasNext()) { Block next = bi.next(); if (block.equals(next)) {