HDFS-9205. Do not schedule corrupt blocks for replication.  (szetszwo)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5411dc55
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5411dc55
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5411dc55

Branch: refs/heads/HDFS-7966
Commit: 5411dc559d5f73e4153e76fdff94a26869c17a37
Parents: 63020c5
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Thu Oct 15 18:07:09 2015 +0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Thu Oct 15 18:07:09 2015 +0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../server/blockmanagement/BlockManager.java    |  35 +++-
 .../blockmanagement/DecommissionManager.java    |   2 +-
 .../server/blockmanagement/NumberReplicas.java  |  18 +-
 .../blockmanagement/UnderReplicatedBlocks.java  | 205 +++++++------------
 .../blockmanagement/TestReplicationPolicy.java  |  71 +++----
 .../TestUnderReplicatedBlockQueues.java         |  14 +-
 7 files changed, 146 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index cddb340..a6dc78f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1531,6 +1531,8 @@ Release 2.8.0 - UNRELEASED
 
     HDFS-9188. Make block corruption related tests FsDataset-agnostic. (lei)
 
+    HDFS-9205. Do not schedule corrupt blocks for replication.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 4185220..c7dbbd5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -785,6 +785,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Remove block from replication queue.
     NumberReplicas replicas = countNodes(lastBlock);
     neededReplications.remove(lastBlock, replicas.liveReplicas(),
+        replicas.readOnlyReplicas(),
         replicas.decommissionedAndDecommissioning(), 
getReplication(lastBlock));
     pendingReplications.remove(lastBlock);
 
@@ -1795,6 +1796,7 @@ public class BlockManager implements BlockStatsMXBean {
     nodesContainingLiveReplicas.clear();
     List<DatanodeDescriptor> srcNodes = new ArrayList<>();
     int live = 0;
+    int readonly = 0;
     int decommissioned = 0;
     int decommissioning = 0;
     int corrupt = 0;
@@ -1820,6 +1822,9 @@ public class BlockManager implements BlockStatsMXBean {
         nodesContainingLiveReplicas.add(storage);
         live += countableReplica;
       }
+      if (storage.getState() == State.READ_ONLY_SHARED) {
+        readonly++;
+      }
       containingNodes.add(node);
       // Check if this replica is corrupt
       // If so, do not select the node as src node
@@ -1858,7 +1863,7 @@ public class BlockManager implements BlockStatsMXBean {
       }
     }
     if(numReplicas != null)
-      numReplicas.initialize(live, decommissioned, decommissioning, corrupt,
+      numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt,
           excess, 0);
     return srcNodes.toArray(new DatanodeDescriptor[srcNodes.size()]);
   }
@@ -1883,7 +1888,7 @@ public class BlockManager implements BlockStatsMXBean {
           }
           NumberReplicas num = countNodes(timedOutItems[i]);
           if (isNeededReplication(bi, num.liveReplicas())) {
-            neededReplications.add(bi, num.liveReplicas(),
+            neededReplications.add(bi, num.liveReplicas(), 
num.readOnlyReplicas(),
                 num.decommissionedAndDecommissioning(), getReplication(bi));
           }
         }
@@ -2799,6 +2804,7 @@ public class BlockManager implements BlockStatsMXBean {
     short fileReplication = getExpectedReplicaNum(storedBlock);
     if (!isNeededReplication(storedBlock, numCurrentReplica)) {
       neededReplications.remove(storedBlock, numCurrentReplica,
+          num.readOnlyReplicas(),
           num.decommissionedAndDecommissioning(), fileReplication);
     } else {
       updateNeededReplications(storedBlock, curReplicaDelta, 0);
@@ -3043,8 +3049,8 @@ public class BlockManager implements BlockStatsMXBean {
     int numCurrentReplica = num.liveReplicas();
     // add to under-replicated queue if need to be
     if (isNeededReplication(block, numCurrentReplica)) {
-      if (neededReplications.add(block, numCurrentReplica, num
-          .decommissionedAndDecommissioning(), expectedReplication)) {
+      if (neededReplications.add(block, numCurrentReplica, 
num.readOnlyReplicas(),
+          num.decommissionedAndDecommissioning(), expectedReplication)) {
         return MisReplicationResult.UNDER_REPLICATED;
       }
     }
@@ -3583,15 +3589,22 @@ public class BlockManager implements BlockStatsMXBean {
    * For a striped block, this includes nodes storing blocks belonging to the
    * striped block group.
    */
-  public NumberReplicas countNodes(BlockInfo b) {
+  public NumberReplicas countNodes(Block b) {
     int decommissioned = 0;
     int decommissioning = 0;
     int live = 0;
+    int readonly = 0;
     int corrupt = 0;
     int excess = 0;
     int stale = 0;
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(b, State.NORMAL)) {
+    for(DatanodeStorageInfo storage : blocksMap.getStorages(b)) {
+      if (storage.getState() == State.FAILED) {
+        continue;
+      } else if (storage.getState() == State.READ_ONLY_SHARED) {
+        readonly++;
+        continue;
+      }
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node))) {
         corrupt++;
@@ -3612,7 +3625,8 @@ public class BlockManager implements BlockStatsMXBean {
         stale++;
       }
     }
-    return new NumberReplicas(live, decommissioned, decommissioning, corrupt, 
excess, stale);
+    return new NumberReplicas(live, readonly, decommissioned, decommissioning,
+        corrupt, excess, stale);
   }
 
   /** 
@@ -3765,13 +3779,13 @@ public class BlockManager implements BlockStatsMXBean {
       NumberReplicas repl = countNodes(block);
       int curExpectedReplicas = getReplication(block);
       if (isNeededReplication(block, repl.liveReplicas())) {
-        neededReplications.update(block, repl.liveReplicas(), repl
-            .decommissionedAndDecommissioning(), curExpectedReplicas,
+        neededReplications.update(block, repl.liveReplicas(), 
repl.readOnlyReplicas(),
+            repl.decommissionedAndDecommissioning(), curExpectedReplicas,
             curReplicasDelta, expectedReplicasDelta);
       } else {
         int oldReplicas = repl.liveReplicas()-curReplicasDelta;
         int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-        neededReplications.remove(block, oldReplicas,
+        neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(),
             repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
       }
     } finally {
@@ -3792,6 +3806,7 @@ public class BlockManager implements BlockStatsMXBean {
       final int pending = pendingReplications.getNumReplicas(block);
       if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
         neededReplications.add(block, n.liveReplicas() + pending,
+            n.readOnlyReplicas(),
             n.decommissionedAndDecommissioning(), expected);
       } else if (n.liveReplicas() > expected) {
         processOverReplicatedBlock(block, expected, null, null);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
index 1f1ae09..42810350 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
@@ -545,7 +545,7 @@ public class DecommissionManager {
               blockManager.isPopulatingReplQueues()) {
             // Process these blocks only when active NN is out of safe mode.
             blockManager.neededReplications.add(block,
-                liveReplicas,
+                liveReplicas, num.readOnlyReplicas(),
                 num.decommissionedAndDecommissioning(),
                 blockManager.getExpectedReplicaNum(block));
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
index e567bbf..44ae6f6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
@@ -23,6 +23,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
  */
 public class NumberReplicas {
   private int liveReplicas;
+  private int readOnlyReplicas;
 
   // Tracks only the decommissioning replicas
   private int decommissioning;
@@ -33,17 +34,18 @@ public class NumberReplicas {
   private int replicasOnStaleNodes;
 
   NumberReplicas() {
-    initialize(0, 0, 0, 0, 0, 0);
+    this(0, 0, 0, 0, 0, 0, 0);
   }
 
-  NumberReplicas(int live, int decommissioned, int decommissioning, int 
corrupt,
-                 int excess, int stale) {
-    initialize(live, decommissioned, decommissioning, corrupt, excess, stale);
+  NumberReplicas(int live, int readonly, int decommissioned,
+      int decommissioning, int corrupt, int excess, int stale) {
+    set(live, readonly, decommissioned, decommissioning, corrupt, excess, 
stale);
   }
 
-  void initialize(int live, int decommissioned, int decommissioning,
-                  int corrupt, int excess, int stale) {
+  void set(int live, int readonly, int decommissioned, int decommissioning,
+      int corrupt, int excess, int stale) {
     liveReplicas = live;
+    readOnlyReplicas = readonly;
     this.decommissioning = decommissioning;
     this.decommissioned = decommissioned;
     corruptReplicas = corrupt;
@@ -55,6 +57,10 @@ public class NumberReplicas {
     return liveReplicas;
   }
 
+  public int readOnlyReplicas() {
+    return readOnlyReplicas;
+  }
+
   /**
    *
    * @return decommissioned replicas + decommissioning replicas

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 7e8f479..d4938c5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
-import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
+
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 
 /**
  * Keep prioritized queues of under replicated blocks.
@@ -34,7 +36,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
  *
  * <p/>
  * The policy for choosing which priority to give added blocks
- * is implemented in {@link #getPriority(BlockInfo, int, int, int)}.
+ * is implemented in {@link #getPriority(BlockInfo, int, int, int, int)}.
  * </p>
  * <p>The queue order is as follows:</p>
  * <ol>
@@ -147,6 +149,7 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
    */
   private int getPriority(BlockInfo block,
                           int curReplicas,
+                          int readOnlyReplicas,
                           int decommissionedReplicas,
                           int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
@@ -159,19 +162,24 @@ class UnderReplicatedBlocks implements 
Iterable<BlockInfo> {
       return getPriorityStriped(curReplicas, decommissionedReplicas,
           sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
     } else {
-      return getPriorityContiguous(curReplicas, decommissionedReplicas,
-          expectedReplicas);
+      return getPriorityContiguous(curReplicas, readOnlyReplicas,
+          decommissionedReplicas, expectedReplicas);
     }
   }
 
-  private int getPriorityContiguous(int curReplicas, int 
decommissionedReplicas,
-      int expectedReplicas) {
+  private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
+      int decommissionedReplicas, int expectedReplicas) {
     if (curReplicas == 0) {
       // If there are zero non-decommissioned replicas but there are
       // some decommissioned replicas, then assign them highest priority
       if (decommissionedReplicas > 0) {
         return QUEUE_HIGHEST_PRIORITY;
       }
+      if (readOnlyReplicas > 0) {
+        // only has read-only replicas, highest risk
+        // since the read-only replicas may go down all together.
+        return QUEUE_HIGHEST_PRIORITY;
+      }
       //all we have are corrupt blocks
       return QUEUE_WITH_CORRUPT_BLOCKS;
     } else if (curReplicas == 1) {
@@ -218,11 +226,12 @@ class UnderReplicatedBlocks implements 
Iterable<BlockInfo> {
    */
   synchronized boolean add(BlockInfo block,
                            int curReplicas,
+                           int readOnlyReplicas,
                            int decomissionedReplicas,
                            int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
-    int priLevel = getPriority(block, curReplicas, decomissionedReplicas,
-                               expectedReplicas);
+    final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
+        decomissionedReplicas, expectedReplicas);
     if(priorityQueues.get(priLevel).add(block)) {
       if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
           expectedReplicas == 1) {
@@ -242,11 +251,11 @@ class UnderReplicatedBlocks implements 
Iterable<BlockInfo> {
   /** remove a block from a under replication queue */
   synchronized boolean remove(BlockInfo block,
                               int oldReplicas,
+                              int oldReadOnlyReplicas,
                               int decommissionedReplicas,
                               int oldExpectedReplicas) {
-    int priLevel = getPriority(block, oldReplicas,
-                               decommissionedReplicas,
-                               oldExpectedReplicas);
+    final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
+        decommissionedReplicas, oldExpectedReplicas);
     boolean removedBlock = remove(block, priLevel);
     if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
         oldExpectedReplicas == 1 &&
@@ -285,10 +294,10 @@ class UnderReplicatedBlocks implements 
Iterable<BlockInfo> {
       // Try to remove the block from all queues if the block was
       // not found in the queue for the given priority level.
       for (int i = 0; i < LEVEL; i++) {
-        if (priorityQueues.get(i).remove(block)) {
+        if (i != priLevel && priorityQueues.get(i).remove(block)) {
           NameNode.blockStateChangeLog.debug(
               "BLOCK* NameSystem.UnderReplicationBlock.remove: Removing block" 
+
-                  " {} from priority queue {}", block, priLevel);
+                  " {} from priority queue {}", block, i);
           return true;
         }
       }
@@ -313,15 +322,15 @@ class UnderReplicatedBlocks implements 
Iterable<BlockInfo> {
    * @param expectedReplicasDelta the change in the expected replica count 
from before
    */
   synchronized void update(BlockInfo block, int curReplicas,
-                           int decommissionedReplicas,
+                           int readOnlyReplicas, int decommissionedReplicas,
                            int curExpectedReplicas,
                            int curReplicasDelta, int expectedReplicasDelta) {
     int oldReplicas = curReplicas-curReplicasDelta;
     int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
-    int curPri = getPriority(block, curReplicas, decommissionedReplicas,
-        curExpectedReplicas);
-    int oldPri = getPriority(block, oldReplicas, decommissionedReplicas,
-        oldExpectedReplicas);
+    int curPri = getPriority(block, curReplicas, readOnlyReplicas,
+        decommissionedReplicas, curExpectedReplicas);
+    int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
+        decommissionedReplicas, oldExpectedReplicas);
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("UnderReplicationBlocks.update " + 
         block +
@@ -371,143 +380,69 @@ class UnderReplicatedBlocks implements 
Iterable<BlockInfo> {
    * @return Return a list of block lists to be replicated. The block list 
index
    *         represents its replication priority.
    */
-  public synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
+  synchronized List<List<BlockInfo>> chooseUnderReplicatedBlocks(
       int blocksToProcess) {
-    // initialize data structure for the return value
-    List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
-    for (int i = 0; i < LEVEL; i++) {
-      blocksToReplicate.add(new ArrayList<BlockInfo>());
-    }
-
-    if (size() == 0) { // There are no blocks to collect.
-      return blocksToReplicate;
-    }
+    final List<List<BlockInfo>> blocksToReplicate = new ArrayList<>(LEVEL);
     
-    int blockCount = 0;
-    for (int priority = 0; priority < LEVEL; priority++) { 
+    int count = 0;
+    int priority = 0;
+    for (; count < blocksToProcess && priority < LEVEL; priority++) {
+      if (priority == QUEUE_WITH_CORRUPT_BLOCKS) {
+        // do not choose corrupted blocks.
+        continue;
+      }
+
       // Go through all blocks that need replications with current priority.
-      BlockIterator neededReplicationsIterator = iterator(priority);
       // Set the iterator to the first unprocessed block at this priority 
level.
-      neededReplicationsIterator.setToBookmark();
-
-      blocksToProcess = Math.min(blocksToProcess, size());
-      
-      if (blockCount == blocksToProcess) {
-        break;  // break if already expected blocks are obtained
-      }
-      
+      final Iterator<BlockInfo> i = priorityQueues.get(priority).getBookmark();
+      final List<BlockInfo> blocks = new LinkedList<>();
+      blocksToReplicate.add(blocks);
       // Loop through all remaining blocks in the list.
-      while (blockCount < blocksToProcess
-          && neededReplicationsIterator.hasNext()) {
-        BlockInfo block = neededReplicationsIterator.next();
-        blocksToReplicate.get(priority).add(block);
-        blockCount++;
+      for(; count < blocksToProcess && i.hasNext(); count++) {
+        blocks.add(i.next());
       }
-      
-      if (!neededReplicationsIterator.hasNext()
-          && neededReplicationsIterator.getPriority() == LEVEL - 1) {
-        // Reset all priorities' bookmarks to the beginning because there were
-        // no recently added blocks in any list.
-        for (int i = 0; i < LEVEL; i++) {
-          this.priorityQueues.get(i).resetBookmark();
-        }
-        break;
+    }
+
+    if (priority == LEVEL) {
+      // Reset all bookmarks because there were no recently added blocks.
+      for (LightWeightLinkedSet<BlockInfo> q : priorityQueues) {
+        q.resetBookmark();
       }
     }
+
     return blocksToReplicate;
   }
 
   /** returns an iterator of all blocks in a given priority queue */
-  synchronized BlockIterator iterator(int level) {
-    return new BlockIterator(level);
+  synchronized Iterator<BlockInfo> iterator(int level) {
+    return priorityQueues.get(level).iterator();
   }
 
   /** return an iterator of all the under replication blocks */
   @Override
-  public synchronized BlockIterator iterator() {
-    return new BlockIterator();
-  }
-
-  /**
-   * An iterator over blocks.
-   */
-  class BlockIterator implements Iterator<BlockInfo> {
-    private int level;
-    private boolean isIteratorForLevel = false;
-    private final List<Iterator<BlockInfo>> iterators = new ArrayList<>();
-
-    /**
-     * Construct an iterator over all queues.
-     */
-    private BlockIterator() {
-      level=0;
-      for(int i=0; i<LEVEL; i++) {
-        iterators.add(priorityQueues.get(i).iterator());
-      }
-    }
-
-    /**
-     * Constrict an iterator for a single queue level
-     * @param l the priority level to iterate over
-     */
-    private BlockIterator(int l) {
-      level = l;
-      isIteratorForLevel = true;
-      iterators.add(priorityQueues.get(level).iterator());
-    }
-
-    private void update() {
-      if (isIteratorForLevel) {
-        return;
-      }
-      while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
-        level++;
-      }
-    }
-
-    @Override
-    public BlockInfo next() {
-      if (isIteratorForLevel) {
-        return iterators.get(0).next();
+  public synchronized Iterator<BlockInfo> iterator() {
+    final Iterator<LightWeightLinkedSet<BlockInfo>> q = 
priorityQueues.iterator();
+    return new Iterator<BlockInfo>() {
+      private Iterator<BlockInfo> b = q.next().iterator();
+
+      @Override
+      public BlockInfo next() {
+        hasNext();
+        return b.next();
       }
-      update();
-      return iterators.get(level).next();
-    }
 
-    @Override
-    public boolean hasNext() {
-      if (isIteratorForLevel) {
-        return iterators.get(0).hasNext();
-      }
-      update();
-      return iterators.get(level).hasNext();
-    }
-
-    @Override
-    public void remove() {
-      if (isIteratorForLevel) {
-        iterators.get(0).remove();
-      } else {
-        iterators.get(level).remove();
+      @Override
+      public boolean hasNext() {
+        for(; !b.hasNext() && q.hasNext(); ) {
+          b = q.next().iterator();
+        }
+        return b.hasNext();
       }
-    }
-
-    int getPriority() {
-      return level;
-    }
 
-    /**
-     * Sets iterator(s) to bookmarked elements.
-     */
-    private synchronized void setToBookmark() {
-      if (this.isIteratorForLevel) {
-        this.iterators.set(0, priorityQueues.get(this.level)
-            .getBookmark());
-      } else {
-        for(int i=0; i<LEVEL; i++) {
-          this.iterators.set(i, priorityQueues.get(i).getBookmark());
-        }
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
       }
-    }
+    };
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
index fa9cc5c..a0adc60 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
@@ -836,7 +836,7 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
         // Adding the blocks directly to normal priority
 
         neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
-            nextLong()), 2, 0, 3);
+            nextLong()), 2, 0, 0, 3);
       }
       // Lets wait for the replication interval, to start process normal
       // priority blocks
@@ -844,7 +844,7 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
       
       // Adding the block directly to high priority list
       neededReplications.add(genBlockInfo(ThreadLocalRandom.current().
-          nextLong()), 1, 0, 3);
+          nextLong()), 1, 0, 0, 3);
 
       // Lets wait for the replication interval
       Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
@@ -868,23 +868,23 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
     for (int i = 0; i < 5; i++) {
       // Adding QUEUE_HIGHEST_PRIORITY block
       underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
-          nextLong()), 1, 0, 3);
+          nextLong()), 1, 0, 0, 3);
 
       // Adding QUEUE_VERY_UNDER_REPLICATED block
       underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
-          nextLong()), 2, 0, 7);
+          nextLong()), 2, 0, 0, 7);
 
       // Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
       underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
-          nextLong()), 6, 0, 6);
+          nextLong()), 6, 0, 0, 6);
 
       // Adding QUEUE_UNDER_REPLICATED block
       underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
-          nextLong()), 5, 0, 6);
+          nextLong()), 5, 0, 0, 6);
 
       // Adding QUEUE_WITH_CORRUPT_BLOCKS block
       underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
-          nextLong()), 0, 0, 3);
+          nextLong()), 0, 0, 0, 3);
     }
 
     // Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
@@ -902,13 +902,12 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
 
     // Adding QUEUE_HIGHEST_PRIORITY
     underReplicatedBlocks.add(genBlockInfo(ThreadLocalRandom.current().
-        nextLong()), 1, 0, 3);
+        nextLong()), 0, 1, 0, 3);
 
     // Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 
block from
     // QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
-    // and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS.
     chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
-    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5);
+    assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4);
 
     // Since it is reached to end of all lists,
     // should start picking the blocks from start.
@@ -920,29 +919,15 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
   
   /** asserts the chosen blocks with expected priority blocks */
   private void assertTheChosenBlocks(
-      List<List<BlockInfo>> chosenBlocks, int firstPrioritySize,
-      int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
-      int fifthPrioritySize) {
-    assertEquals(
-        "Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks",
-        firstPrioritySize, chosenBlocks.get(
-            UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size());
-    assertEquals(
-        "Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED 
blocks",
-        secondPrioritySize, chosenBlocks.get(
-            UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
-    assertEquals(
-        "Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
-        thirdPrioritySize, chosenBlocks.get(
-            UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
-    assertEquals(
-        "Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED 
blocks",
-        fourthPrioritySize, chosenBlocks.get(
-            UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
-    assertEquals(
-        "Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
-        fifthPrioritySize, chosenBlocks.get(
-            UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
+      List<List<BlockInfo>> chosenBlocks, int... expectedSizes) {
+    int i = 0;
+    for(; i < chosenBlocks.size(); i++) {
+      assertEquals("Not returned the expected number for i=" + i,
+          expectedSizes[i], chosenBlocks.get(i).size());
+    }
+    for(; i < expectedSizes.length; i++) {
+      assertEquals("Expected size is non-zero for i=" + i, 0, 
expectedSizes[i]);
+    }
   }
   
   /**
@@ -1101,14 +1086,14 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
     // Adding QUEUE_VERY_UNDER_REPLICATED block
     final int block1CurReplicas = 2;
     final int block1ExpectedReplicas = 7;
-    underReplicatedBlocks.add(block1, block1CurReplicas, 0,
+    underReplicatedBlocks.add(block1, block1CurReplicas, 0, 0,
         block1ExpectedReplicas);
 
     // Adding QUEUE_VERY_UNDER_REPLICATED block
-    underReplicatedBlocks.add(block2, 2, 0, 7);
+    underReplicatedBlocks.add(block2, 2, 0, 0, 7);
 
     // Adding QUEUE_UNDER_REPLICATED block
-    underReplicatedBlocks.add(block3, 2, 0, 6);
+    underReplicatedBlocks.add(block3, 2, 0, 0, 6);
 
     List<List<BlockInfo>> chosenBlocks;
 
@@ -1119,7 +1104,7 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
 
     // Increasing the replications will move the block down a
     // priority.  This simulates a replica being completed in between checks.
-    underReplicatedBlocks.update(block1, block1CurReplicas+1, 0,
+    underReplicatedBlocks.update(block1, block1CurReplicas+1, 0, 0,
         block1ExpectedReplicas, 1, 0);
 
     // Choose 1 block from UnderReplicatedBlocks. Then it should pick 1 block
@@ -1147,10 +1132,10 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
     BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
 
     // Adding QUEUE_UNDER_REPLICATED block
-    underReplicatedBlocks.add(block1, 0, 1, 1);
+    underReplicatedBlocks.add(block1, 0, 0, 1, 1);
 
     // Adding QUEUE_UNDER_REPLICATED block
-    underReplicatedBlocks.add(block2, 0, 1, 1);
+    underReplicatedBlocks.add(block2, 0, 0, 1, 1);
 
     List<List<BlockInfo>> chosenBlocks;
 
@@ -1205,10 +1190,10 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
     BlockInfo block2 = genBlockInfo(blkID2);
 
     // Adding QUEUE_UNDER_REPLICATED block
-    underReplicatedBlocks.add(block1, 0, 1, 1);
+    underReplicatedBlocks.add(block1, 0, 0, 1, 1);
 
     // Adding QUEUE_UNDER_REPLICATED block
-    underReplicatedBlocks.add(block2, 0, 1, 1);
+    underReplicatedBlocks.add(block2, 0, 0, 1, 1);
 
     List<List<BlockInfo>> chosenBlocks;
 
@@ -1268,10 +1253,10 @@ public class TestReplicationPolicy extends 
BaseReplicationPolicyTest {
     BlockInfo block2 = genBlockInfo(ThreadLocalRandom.current().nextLong());
 
     // Adding QUEUE_UNDER_REPLICATED block
-    underReplicatedBlocks.add(block1, 0, 1, 1);
+    underReplicatedBlocks.add(block1, 0, 0, 1, 1);
 
     // Adding QUEUE_UNDER_REPLICATED block
-    underReplicatedBlocks.add(block2, 0, 1, 1);
+    underReplicatedBlocks.add(block2, 0, 0, 1, 1);
 
     List<List<BlockInfo>> chosenBlocks;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5411dc55/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
index 7cd2e19..3ad45df 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlockQueues.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.Iterator;
+
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
@@ -64,7 +66,7 @@ public class TestUnderReplicatedBlockQueues {
     assertEquals(1, queues.size());
     assertInLevel(queues, block1, 
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY);
     //repeated additions fail
-    assertFalse(queues.add(block1, 1, 0, 3));
+    assertFalse(queues.add(block1, 1, 0, 0, 3));
 
     //add a second block with two replicas
     assertAdded(queues, block2, 2, 0, 3);
@@ -88,11 +90,11 @@ public class TestUnderReplicatedBlockQueues {
     assertAdded(queues, block_corrupt_repl_one, 0, 0, 1);
     assertEquals(2, queues.getCorruptBlockSize());
     assertEquals(1, queues.getCorruptReplOneBlockSize());
-    queues.update(block_corrupt_repl_one, 0, 0, 3, 0, 2);
+    queues.update(block_corrupt_repl_one, 0, 0, 0, 3, 0, 2);
     assertEquals(0, queues.getCorruptReplOneBlockSize());
-    queues.update(block_corrupt, 0, 0, 1, 0, -2);
+    queues.update(block_corrupt, 0, 0, 0, 1, 0, -2);
     assertEquals(1, queues.getCorruptReplOneBlockSize());
-    queues.update(block_very_under_replicated, 0, 0, 1, -4, -24);
+    queues.update(block_very_under_replicated, 0, 0, 0, 1, -4, -24);
     assertEquals(2, queues.getCorruptReplOneBlockSize());
   }
 
@@ -151,7 +153,7 @@ public class TestUnderReplicatedBlockQueues {
                            int expectedReplicas) {
     assertTrue("Failed to add " + block,
                queues.add(block,
-                          curReplicas,
+                          curReplicas, 0,
                           decomissionedReplicas,
                           expectedReplicas));
   }
@@ -169,7 +171,7 @@ public class TestUnderReplicatedBlockQueues {
   private void assertInLevel(UnderReplicatedBlocks queues,
                              Block block,
                              int level) {
-    UnderReplicatedBlocks.BlockIterator bi = queues.iterator(level);
+    final Iterator<BlockInfo> bi = queues.iterator(level);
     while (bi.hasNext()) {
       Block next = bi.next();
       if (block.equals(next)) {

Reply via email to