HDFS-9918. Erasure Coding: Sort located striped blocks based on decommissioned 
states. Contributed by Rakesh R.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6ef42873
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6ef42873
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6ef42873

Branch: refs/heads/HDFS-7240
Commit: 6ef42873a02bfcbff5521869f4d6f66539d1db41
Parents: 600d129
Author: Zhe Zhang <z...@apache.org>
Authored: Tue Apr 12 13:38:58 2016 -0700
Committer: Zhe Zhang <z...@apache.org>
Committed: Tue Apr 12 13:38:58 2016 -0700

----------------------------------------------------------------------
 .../server/blockmanagement/DatanodeManager.java | 115 +++-
 .../hdfs/server/namenode/FSNamesystem.java      |  22 +-
 .../hdfs/TestDecommissionWithStriped.java       |  76 ++-
 .../TestSortLocatedStripedBlock.java            | 557 +++++++++++++++++++
 4 files changed, 731 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ef42873/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index cd1bdab..da02a90 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import 
org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
@@ -47,6 +48,7 @@ import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStr
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import java.io.IOException;
@@ -368,49 +370,110 @@ public class DatanodeManager {
 
   }
   
-  /** Sort the located blocks by the distance to the target host. */
-  public void sortLocatedBlocks(final String targethost,
-      final List<LocatedBlock> locatedblocks) {
-    //sort the blocks
+  /**
+   * Sort the non-striped located blocks by the distance to the target host.
+   *
+   * For striped blocks, it will only move decommissioned/stale nodes to the
+   * bottom. For example, assume we have storage list:
+   * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9
+   * mapping to block indices:
+   * 0, 1, 2, 3, 4, 5, 6, 7, 8, 2
+   *
+   * Here the internal block b2 is duplicated, locating in d2 and d9. If d2 is
+   * a decommissioning node then should switch d2 and d9 in the storage list.
+   * After sorting locations, will update corresponding block indices
+   * and block tokens.
+   */
+  public void sortLocatedBlocks(final String targetHost,
+      final List<LocatedBlock> locatedBlocks) {
+    Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
+        new DFSUtil.DecomStaleComparator(staleInterval) :
+        DFSUtil.DECOM_COMPARATOR;
+    // sort located block
+    for (LocatedBlock lb : locatedBlocks) {
+      if (lb.isStriped()) {
+        sortLocatedStripedBlock(lb, comparator);
+      } else {
+        sortLocatedBlock(lb, targetHost, comparator);
+      }
+    }
+  }
+
+  /**
+   * Move decommissioned/stale datanodes to the bottom. After sorting it will
+   * update block indices and block tokens respectively.
+   *
+   * @param lb located striped block
+   * @param comparator dn comparator
+   */
+  private void sortLocatedStripedBlock(final LocatedBlock lb,
+      Comparator<DatanodeInfo> comparator) {
+    DatanodeInfo[] di = lb.getLocations();
+    HashMap<DatanodeInfo, Byte> locToIndex = new HashMap<>();
+    HashMap<DatanodeInfo, Token<BlockTokenIdentifier>> locToToken =
+        new HashMap<>();
+    LocatedStripedBlock lsb = (LocatedStripedBlock) lb;
+    for (int i = 0; i < di.length; i++) {
+      locToIndex.put(di[i], lsb.getBlockIndices()[i]);
+      locToToken.put(di[i], lsb.getBlockTokens()[i]);
+    }
+    // Move decommissioned/stale datanodes to the bottom
+    Arrays.sort(di, comparator);
+
+    // must update cache since we modified locations array
+    lb.updateCachedStorageInfo();
+
+    // must update block indices and block tokens respectively
+    for (int i = 0; i < di.length; i++) {
+      lsb.getBlockIndices()[i] = locToIndex.get(di[i]);
+      lsb.getBlockTokens()[i] = locToToken.get(di[i]);
+    }
+  }
+
+  /**
+   * Move decommissioned/stale datanodes to the bottom. Also, sort nodes by
+   * network distance.
+   *
+   * @param lb located block
+   * @param targetHost target host
+   * @param comparator dn comparator
+   */
+  private void sortLocatedBlock(final LocatedBlock lb, String targetHost,
+      Comparator<DatanodeInfo> comparator) {
     // As it is possible for the separation of node manager and datanode, 
     // here we should get node but not datanode only .
-    Node client = getDatanodeByHost(targethost);
+    Node client = getDatanodeByHost(targetHost);
     if (client == null) {
       List<String> hosts = new ArrayList<> (1);
-      hosts.add(targethost);
+      hosts.add(targetHost);
       List<String> resolvedHosts = dnsToSwitchMapping.resolve(hosts);
       if (resolvedHosts != null && !resolvedHosts.isEmpty()) {
         String rName = resolvedHosts.get(0);
         if (rName != null) {
           client = new NodeBase(rName + NodeBase.PATH_SEPARATOR_STR +
-            targethost);
+            targetHost);
         }
       } else {
         LOG.error("Node Resolution failed. Please make sure that rack " +
           "awareness scripts are functional.");
       }
     }
-    
-    Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
-        new DFSUtil.DecomStaleComparator(staleInterval) : 
-        DFSUtil.DECOM_COMPARATOR;
-        
-    for (LocatedBlock b : locatedblocks) {
-      DatanodeInfo[] di = b.getLocations();
-      // Move decommissioned/stale datanodes to the bottom
-      Arrays.sort(di, comparator);
-      
-      int lastActiveIndex = di.length - 1;
-      while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) {
-          --lastActiveIndex;
-      }
-      int activeLen = lastActiveIndex + 1;      
-      networktopology.sortByDistance(client, b.getLocations(), activeLen);
-      // must update cache since we modified locations array
-      b.updateCachedStorageInfo();
+
+    DatanodeInfo[] di = lb.getLocations();
+    // Move decommissioned/stale datanodes to the bottom
+    Arrays.sort(di, comparator);
+
+    // Sort nodes by network distance only for located blocks
+    int lastActiveIndex = di.length - 1;
+    while (lastActiveIndex > 0 && isInactive(di[lastActiveIndex])) {
+      --lastActiveIndex;
     }
+    int activeLen = lastActiveIndex + 1;
+    networktopology.sortByDistance(client, lb.getLocations(), activeLen);
+
+    // must update cache since we modified locations array
+    lb.updateCachedStorageInfo();
   }
-  
 
   /** @return the datanode descriptor for the host. */
   public DatanodeDescriptor getDatanodeByHost(final String host) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ef42873/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 00e1241..39b3598 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -184,7 +184,6 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeException;
 import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -1779,25 +1778,28 @@ public class FSNamesystem implements Namesystem, 
FSNamesystemMBean,
     }
 
     LocatedBlocks blocks = res.blocks;
+    sortLocatedBlocks(clientMachine, blocks);
+    return blocks;
+  }
+
+  private void sortLocatedBlocks(String clientMachine, LocatedBlocks blocks) {
     if (blocks != null) {
       List<LocatedBlock> blkList = blocks.getLocatedBlocks();
-      if (blkList == null || blkList.size() == 0 ||
-          blkList.get(0) instanceof LocatedStripedBlock) {
-        // no need to sort locations for striped blocks
-        return blocks;
+      if (blkList == null || blkList.size() == 0) {
+        // simply return, block list is empty
+        return;
       }
-      blockManager.getDatanodeManager().sortLocatedBlocks(
-          clientMachine, blkList);
+      blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+          blkList);
 
       // lastBlock is not part of getLocatedBlocks(), might need to sort it too
       LocatedBlock lastBlock = blocks.getLastLocatedBlock();
       if (lastBlock != null) {
         ArrayList<LocatedBlock> lastBlockList = Lists.newArrayList(lastBlock);
-        blockManager.getDatanodeManager().sortLocatedBlocks(
-            clientMachine, lastBlockList);
+        blockManager.getDatanodeManager().sortLocatedBlocks(clientMachine,
+            lastBlockList);
       }
     }
-    return blocks;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ef42873/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
index bde2ceb..c0d8268 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommissionWithStriped.java
@@ -27,6 +27,7 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -49,12 +50,15 @@ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.PathUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -160,6 +164,13 @@ public class TestDecommissionWithStriped {
   }
 
   @Test(timeout = 120000)
+  public void testFileMultipleBlockGroups() throws Exception {
+    LOG.info("Starting test testFileMultipleBlockGroups");
+    int writeBytes = 2 * blockSize * dataBlocks;
+    testDecommission(writeBytes, 9, 1, "testFileMultipleBlockGroups");
+  }
+
+  @Test(timeout = 120000)
   public void testFileSmallerThanOneCell() throws Exception {
     LOG.info("Starting test testFileSmallerThanOneCell");
     testDecommission(cellSize - 1, 4, 1, "testFileSmallerThanOneCell");
@@ -274,7 +285,15 @@ public class TestDecommissionWithStriped {
 
     int deadDecomissioned = fsn.getNumDecomDeadDataNodes();
     int liveDecomissioned = fsn.getNumDecomLiveDataNodes();
-    ((HdfsDataInputStream) dfs.open(ecFile)).getAllBlocks();
+    List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
+        .getAllBlocks();
+
+    // prepare expected block index and token list.
+    List<HashMap<DatanodeInfo, Byte>> locToIndexList = new ArrayList<>();
+    List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList =
+        new ArrayList<>();
+    prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList);
+
     // Decommission node. Verify that node is decommissioned.
     decommissionNode(0, decommisionNodes, AdminStates.DECOMMISSIONED);
 
@@ -290,9 +309,55 @@ public class TestDecommissionWithStriped {
     assertNull(checkFile(dfs, ecFile, storageCount, decommisionNodes, numDNs));
     StripedFileTestUtil.checkData(dfs, ecFile, writeBytes, decommisionNodes,
         null);
+
+    assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList);
+
     cleanupFile(dfs, ecFile);
   }
 
+  private void prepareBlockIndexAndTokenList(List<LocatedBlock> lbs,
+      List<HashMap<DatanodeInfo, Byte>> locToIndexList,
+      List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList) 
{
+    for (LocatedBlock lb : lbs) {
+      HashMap<DatanodeInfo, Byte> locToIndex = new HashMap<DatanodeInfo, 
Byte>();
+      locToIndexList.add(locToIndex);
+
+      HashMap<DatanodeInfo, Token<BlockTokenIdentifier>> locToToken =
+          new HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>();
+      locToTokenList.add(locToToken);
+
+      DatanodeInfo[] di = lb.getLocations();
+      LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb;
+      for (int i = 0; i < di.length; i++) {
+        locToIndex.put(di[i], stripedBlk.getBlockIndices()[i]);
+        locToToken.put(di[i], stripedBlk.getBlockTokens()[i]);
+      }
+    }
+  }
+
+  /**
+   * Verify block index and token values. Must update block indices and block
+   * tokens after sorting.
+   */
+  private void assertBlockIndexAndTokenPosition(List<LocatedBlock> lbs,
+      List<HashMap<DatanodeInfo, Byte>> locToIndexList,
+      List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList) 
{
+    for (int i = 0; i < lbs.size(); i++) {
+      LocatedBlock lb = lbs.get(i);
+      LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb;
+      HashMap<DatanodeInfo, Byte> locToIndex = locToIndexList.get(i);
+      HashMap<DatanodeInfo, Token<BlockTokenIdentifier>> locToToken =
+          locToTokenList.get(i);
+      DatanodeInfo[] di = lb.getLocations();
+      for (int j = 0; j < di.length; j++) {
+        Assert.assertEquals("Block index value mismatches after sorting",
+            (byte) locToIndex.get(di[j]), stripedBlk.getBlockIndices()[j]);
+        Assert.assertEquals("Block token value mismatches after sorting",
+            locToToken.get(di[j]), stripedBlk.getBlockTokens()[j]);
+      }
+    }
+  }
+
   private List<DatanodeInfo> getDecommissionDatanode(DistributedFileSystem dfs,
       Path ecFile, int writeBytes, int decomNodeCount) throws IOException {
     ArrayList<DatanodeInfo> decommissionedNodes = new ArrayList<>();
@@ -447,7 +512,12 @@ public class TestDecommissionWithStriped {
               return "For block " + blk.getBlock() + " replica on " + nodes[j]
                   + " is given as downnode, " + "but is not decommissioned";
             }
-            // TODO: Add check to verify that the Decommissioned node (if any)
+            // Decommissioned node (if any) should only be last node in list.
+            if (j < repl) {
+              return "For block " + blk.getBlock() + " decommissioned node "
+                  + nodes[j] + " was not last node in list: " + (j + 1) + " of 
"
+                  + nodes.length;
+            }
             // should only be last node in list.
             LOG.info("Block " + blk.getBlock() + " replica on " + nodes[j]
                 + " is decommissioned.");
@@ -470,4 +540,4 @@ public class TestDecommissionWithStriped {
     }
     return null;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6ef42873/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedStripedBlock.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedStripedBlock.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedStripedBlock.java
new file mode 100644
index 0000000..1dd067d
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSortLocatedStripedBlock.java
@@ -0,0 +1,557 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.StripedFileTestUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.Time;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class tests the sorting of located striped blocks based on
+ * decommissioned states.
+ */
+public class TestSortLocatedStripedBlock {
+  static final Logger LOG = LoggerFactory
+      .getLogger(TestSortLocatedStripedBlock.class);
+  static final int BLK_GROUP_WIDTH = StripedFileTestUtil.NUM_DATA_BLOCKS
+      + StripedFileTestUtil.NUM_PARITY_BLOCKS;
+  static final int NUM_DATA_BLOCKS = StripedFileTestUtil.NUM_DATA_BLOCKS;
+  static final int NUM_PARITY_BLOCKS = StripedFileTestUtil.NUM_PARITY_BLOCKS;
+  static DatanodeManager dm;
+  static final long STALE_INTERVAL = 30 * 1000 * 60;
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    dm = mockDatanodeManager();
+  }
+
+  /**
+   * Test to verify sorting with multiple decommissioned datanodes exists in
+   * storage lists.
+   *
+   * We have storage list, marked decommissioned internal blocks with a '
+   * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, d11, d12
+   * mapping to indices
+   * 0', 1', 2, 3, 4, 5, 6, 7', 8', 0, 1, 7, 8
+   *
+   * Decommissioned node indices: 0, 1, 7, 8
+   *
+   * So in the original list nodes d0, d1, d7, d8 are decommissioned state.
+   *
+   * After sorting the expected block indices list should be,
+   * 0, 1, 2, 3, 4, 5, 6, 7, 8, 0', 1', 7', 8'
+   *
+   * After sorting the expected storage list will be,
+   * d9, d10, d2, d3, d4, d5, d6, d11, d12, d0, d1, d7, d8.
+   *
+   * Note: after sorting block indices will not be in ascending order.
+   */
+  @Test(timeout = 10000)
+  public void testWithMultipleDecommnDatanodes() {
+    LOG.info("Starting test testSortWithMultipleDecommnDatanodes");
+    int lbsCount = 2; // two located block groups
+    List<Integer> decommnNodeIndices = new ArrayList<>();
+    decommnNodeIndices.add(0);
+    decommnNodeIndices.add(1);
+    decommnNodeIndices.add(7);
+    decommnNodeIndices.add(8);
+    List<Integer> targetNodeIndices = new ArrayList<>();
+    targetNodeIndices.addAll(decommnNodeIndices);
+    // map contains decommissioned node details in each located strip block
+    // which will be used for assertions
+    HashMap<Integer, List<String>> decommissionedNodes = new HashMap<>(
+        lbsCount * decommnNodeIndices.size());
+    List<LocatedBlock> lbs = createLocatedStripedBlocks(lbsCount,
+        NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS, decommnNodeIndices,
+        targetNodeIndices, decommissionedNodes);
+
+    // prepare expected block index and token list.
+    List<HashMap<DatanodeInfo, Byte>> locToIndexList = new ArrayList<>();
+    List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList =
+        new ArrayList<>();
+    prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList);
+
+    dm.sortLocatedBlocks(null, lbs);
+
+    assertDecommnNodePosition(BLK_GROUP_WIDTH, decommissionedNodes, lbs);
+    assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList);
+  }
+
+  /**
+   * Test to verify sorting with two decommissioned datanodes exists in
+   * storage lists for the same block index.
+   *
+   * We have storage list, marked decommissioned internal blocks with a '
+   * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, d11, d12, d13
+   * mapping to indices
+   * 0', 1', 2, 3, 4', 5', 6, 7, 8, 0, 1', 4, 5, 1
+   *
+   * Decommissioned node indices: 0', 1', 4', 5', 1'
+   *
+   * Here decommissioned has done twice to the datanode block index 1.
+   * So in the original list nodes d0, d1, d4, d5, d10 are decommissioned 
state.
+   *
+   * After sorting the expected block indices list will be,
+   * 0, 1, 2, 3, 4, 5, 6, 7, 8, 0', 1', 1', 4', 5'
+   *
+   * After sorting the expected storage list will be,
+   * d9, d13, d2, d3, d11, d12, d6, d7, d8, d0, d1, d10, d4, d5.
+   *
+   * Note: after sorting block indices will not be in ascending order.
+   */
+  @Test(timeout = 10000)
+  public void testTwoDatanodesWithSameBlockIndexAreDecommn() {
+    LOG.info("Starting test testTwoDatanodesWithSameBlockIndexAreDecommn");
+    int lbsCount = 2; // two located block groups
+    List<Integer> decommnNodeIndices = new ArrayList<>();
+    decommnNodeIndices.add(0);
+    decommnNodeIndices.add(1);
+    decommnNodeIndices.add(4);
+    decommnNodeIndices.add(5);
+    // representing blockIndex 1, later this also decommissioned
+    decommnNodeIndices.add(1);
+
+    List<Integer> targetNodeIndices = new ArrayList<>();
+    targetNodeIndices.addAll(decommnNodeIndices);
+    // map contains decommissioned node details in each located strip block
+    // which will be used for assertions
+    HashMap<Integer, List<String>> decommissionedNodes = new HashMap<>(
+        lbsCount * decommnNodeIndices.size());
+    List<LocatedBlock> lbs = createLocatedStripedBlocks(lbsCount,
+        NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS, decommnNodeIndices,
+        targetNodeIndices, decommissionedNodes);
+
+    // prepare expected block index and token list.
+    List<HashMap<DatanodeInfo, Byte>> locToIndexList = new ArrayList<>();
+    List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList =
+        new ArrayList<>();
+    prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList);
+
+    dm.sortLocatedBlocks(null, lbs);
+    assertDecommnNodePosition(BLK_GROUP_WIDTH, decommissionedNodes, lbs);
+    assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList);
+  }
+
+  /**
+   * Test to verify sorting with decommissioned datanodes exists in storage
+   * list which is smaller than stripe size.
+   *
+   * We have storage list, marked decommissioned internal blocks with a '
+   * d0, d1, d2, d3, d6, d7, d8, d9, d10, d11
+   * mapping to indices
+   * 0', 1, 2', 3, 6, 7, 8, 0, 2', 2
+   *
+   * Decommissioned node indices: 0', 2', 2'
+   *
+   * Here decommissioned has done twice to the datanode block index 2.
+   * So in the original list nodes d0, d2, d10 are decommissioned state.
+   *
+   * After sorting the expected block indices list should be,
+   * 0, 1, 2, 3, 6, 7, 8, 0', 2', 2'
+   *
+   * After sorting the expected storage list will be,
+   * d9, d1, d11, d3, d6, d7, d8, d0, d2, d10.
+   *
+   * Note: after sorting block indices will not be in ascending order.
+   */
+  @Test(timeout = 10000)
+  public void testSmallerThanOneStripeWithMultpleDecommnNodes()
+      throws Exception {
+    LOG.info("Starting test testSmallerThanOneStripeWithDecommn");
+    int lbsCount = 2; // two located block groups
+    List<Integer> decommnNodeIndices = new ArrayList<>();
+    decommnNodeIndices.add(0);
+    decommnNodeIndices.add(2);
+    // representing blockIndex 1, later this also decommissioned
+    decommnNodeIndices.add(2);
+
+    List<Integer> targetNodeIndices = new ArrayList<>();
+    targetNodeIndices.addAll(decommnNodeIndices);
+    // map contains decommissioned node details in each located strip block
+    // which will be used for assertions
+    HashMap<Integer, List<String>> decommissionedNodes = new HashMap<>(
+        lbsCount * decommnNodeIndices.size());
+    int dataBlksNum = NUM_DATA_BLOCKS - 2;
+    List<LocatedBlock> lbs = createLocatedStripedBlocks(lbsCount, dataBlksNum,
+        NUM_PARITY_BLOCKS, decommnNodeIndices, targetNodeIndices,
+        decommissionedNodes);
+
+    // prepare expected block index and token list.
+    List<HashMap<DatanodeInfo, Byte>> locToIndexList = new ArrayList<>();
+    List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList =
+        new ArrayList<>();
+    prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList);
+
+    dm.sortLocatedBlocks(null, lbs);
+
+    // After this index all are decommissioned nodes.
+    int blkGrpWidth = dataBlksNum + NUM_PARITY_BLOCKS;
+    assertDecommnNodePosition(blkGrpWidth, decommissionedNodes, lbs);
+    assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList);
+  }
+
+  /**
+   * Test to verify sorting with decommissioned datanodes exists in storage
+   * list but the corresponding new target datanode doesn't exists.
+   *
+   * We have storage list, marked decommissioned internal blocks with a '
+   * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, d11
+   * mapping to indices
+   * 0', 1', 2', 3, 4', 5', 6, 7, 8, 0, 2, 4
+   *
+   * Decommissioned node indices: 0', 1', 2', 4', 5'
+   *
+   * 1 and 5 nodes doesn't exists in the target list. This can happen, the
+   * target node block corrupted or lost after the successful decommissioning.
+   * So in the original list nodes corresponding to the decommissioned block
+   * index 1 and 5 doesn't have any target entries.
+   *
+   * After sorting the expected block indices list should be,
+   * 0, 2, 3, 4, 6, 7, 8, 0', 1', 2', 4', 5'
+   *
+   * After sorting the expected storage list will be,
+   * d9, d10, d3, d11, d6, d7, d8, d0, d1, d2, d4, d5.
+   *
+   * Note: after sorting block indices will not be in ascending order.
+   */
+  @Test(timeout = 10000)
+  public void testTargetDecommnDatanodeDoesntExists() {
+    LOG.info("Starting test testTargetDecommnDatanodeDoesntExists");
+    int lbsCount = 2; // two located block groups
+    List<Integer> decommnNodeIndices = new ArrayList<>();
+    decommnNodeIndices.add(0);
+    decommnNodeIndices.add(1);
+    decommnNodeIndices.add(2);
+    decommnNodeIndices.add(4);
+    decommnNodeIndices.add(5);
+
+    List<Integer> targetNodeIndices = new ArrayList<>();
+    targetNodeIndices.add(0);
+    targetNodeIndices.add(2);
+    targetNodeIndices.add(4);
+    // 1 and 5 nodes doesn't exists in the target list. One such case is, the
+    // target node block corrupted or lost after the successful decommissioning
+
+    // map contains decommissioned node details in each located strip block
+    // which will be used for assertions
+    HashMap<Integer, List<String>> decommissionedNodes = new HashMap<>(
+        lbsCount * decommnNodeIndices.size());
+    List<LocatedBlock> lbs = createLocatedStripedBlocks(lbsCount,
+        NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS, decommnNodeIndices,
+        targetNodeIndices, decommissionedNodes);
+
+    // prepare expected block index and token list.
+    List<HashMap<DatanodeInfo, Byte>> locToIndexList = new ArrayList<>();
+    List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList =
+        new ArrayList<>();
+    prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList);
+
+    dm.sortLocatedBlocks(null, lbs);
+
+    // After this index all are decommissioned nodes. Needs to reconstruct two
+    // more block indices.
+    int blkGrpWidth = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS - 2;
+    assertDecommnNodePosition(blkGrpWidth, decommissionedNodes, lbs);
+    assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList);
+  }
+
+  /**
+   * Test to verify sorting with multiple in-service and decommissioned
+   * datanodes exists in storage lists.
+   *
+   * We have storage list, marked decommissioned internal blocks with a '
+   * d0, d1, d2, d3, d4, d5, d6, d7, d8, d9, d10, d11, d12, d13
+   * mapping to indices
+   * 0', 1', 2, 3, 4, 5, 6, 7', 8', 0, 1, 7, 8, 1
+   *
+   * Decommissioned node indices: 0', 1', 7', 8'
+   *
+   * Additional In-Service node d13 at the end, block index: 1
+   *
+   * So in the original list nodes d0, d1, d7, d8 are decommissioned state.
+   *
+   * After sorting the expected block indices list will be,
+   * 0, 1, 2, 3, 4, 5, 6, 7, 8, 1, 0', 1', 7', 8'
+   *
+   * After sorting the expected storage list will be,
+   * d9, d10, d2, d3, d4, d5, d6, d11, d12, d13, d0, d1, d7, d8.
+   *
+   * Note: after sorting block indices will not be in ascending order.
+   */
+  @Test(timeout = 10000)
+  public void testWithMultipleInServiceAndDecommnDatanodes() {
+    LOG.info("Starting test testWithMultipleInServiceAndDecommnDatanodes");
+    int lbsCount = 2; // two located block groups
+    List<Integer> decommnNodeIndices = new ArrayList<>();
+    decommnNodeIndices.add(0);
+    decommnNodeIndices.add(1);
+    decommnNodeIndices.add(7);
+    decommnNodeIndices.add(8);
+    List<Integer> targetNodeIndices = new ArrayList<>();
+    targetNodeIndices.addAll(decommnNodeIndices);
+
+    // at the end add an additional In-Service node to blockIndex=1
+    targetNodeIndices.add(1);
+
+    // map contains decommissioned node details in each located strip block
+    // which will be used for assertions
+    HashMap<Integer, List<String>> decommissionedNodes = new HashMap<>(
+        lbsCount * decommnNodeIndices.size());
+    List<LocatedBlock> lbs = createLocatedStripedBlocks(lbsCount,
+        NUM_DATA_BLOCKS, NUM_PARITY_BLOCKS, decommnNodeIndices,
+        targetNodeIndices, decommissionedNodes);
+    List <DatanodeInfo> staleDns = new ArrayList<>();
+    for (LocatedBlock lb : lbs) {
+      DatanodeInfo[] locations = lb.getLocations();
+      DatanodeInfo staleDn = locations[locations.length - 1];
+      staleDn
+          .setLastUpdateMonotonic(Time.monotonicNow() - (STALE_INTERVAL * 2));
+      staleDns.add(staleDn);
+    }
+
+    // prepare expected block index and token list.
+    List<HashMap<DatanodeInfo, Byte>> locToIndexList = new ArrayList<>();
+    List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList =
+        new ArrayList<>();
+    prepareBlockIndexAndTokenList(lbs, locToIndexList, locToTokenList);
+
+    dm.sortLocatedBlocks(null, lbs);
+
+    assertDecommnNodePosition(BLK_GROUP_WIDTH + 1, decommissionedNodes, lbs);
+    assertBlockIndexAndTokenPosition(lbs, locToIndexList, locToTokenList);
+
+    for (LocatedBlock lb : lbs) {
+      byte[] blockIndices = ((LocatedStripedBlock) lb).getBlockIndices();
+      // after sorting stale block index will be placed after normal nodes.
+      Assert.assertEquals("Failed to move stale node to bottom!", 1,
+          blockIndices[9]);
+      DatanodeInfo[] locations = lb.getLocations();
+      // After sorting stale node d13 will be placed after normal nodes
+      Assert.assertEquals("Failed to move stale dn after normal one!",
+          staleDns.remove(0), locations[9]);
+    }
+  }
+
+  /**
+   * Verify that decommissioned/stale nodes must be positioned after normal
+   * nodes.
+   */
+  private void assertDecommnNodePosition(int blkGrpWidth,
+      HashMap<Integer, List<String>> decommissionedNodes,
+      List<LocatedBlock> lbs) {
+    for (int i = 0; i < lbs.size(); i++) { // for each block
+      LocatedBlock blk = lbs.get(i);
+      DatanodeInfo[] nodes = blk.getLocations();
+      List<String> decommissionedNodeList = decommissionedNodes.get(i);
+
+      for (int j = 0; j < nodes.length; j++) { // for each replica
+        DatanodeInfo dnInfo = nodes[j];
+        LOG.info("Block Locations size={}, locs={}, j=", nodes.length,
+            dnInfo.toString(), j);
+        if (j < blkGrpWidth) {
+          Assert.assertEquals("Node shouldn't be decommissioned",
+              AdminStates.NORMAL, dnInfo.getAdminState());
+        } else {
+          // check against decommissioned list
+          Assert.assertTrue(
+              "For block " + blk.getBlock() + " decommissioned node " + dnInfo
+                  + " is not last node in list: " + j + "th index of "
+                  + nodes.length,
+              decommissionedNodeList.contains(dnInfo.getXferAddr()));
+          Assert.assertEquals("Node should be decommissioned",
+              AdminStates.DECOMMISSIONED, dnInfo.getAdminState());
+        }
+      }
+    }
+  }
+
+  private List<LocatedBlock> createLocatedStripedBlocks(int blkGrpCount,
+      int dataNumBlk, int numParityBlk, List<Integer> decommnNodeIndices,
+      List<Integer> targetNodeIndices,
+      HashMap<Integer, List<String>> decommissionedNodes) {
+
+    final List<LocatedBlock> lbs = new ArrayList<>(blkGrpCount);
+    for (int i = 0; i < blkGrpCount; i++) {
+      ArrayList<String> decommNodeInfo = new ArrayList<String>();
+      decommissionedNodes.put(new Integer(i), decommNodeInfo);
+      List<Integer> dummyDecommnNodeIndices = new ArrayList<>();
+      dummyDecommnNodeIndices.addAll(decommnNodeIndices);
+
+      LocatedStripedBlock lsb = createEachLocatedBlock(dataNumBlk, 
numParityBlk,
+          dummyDecommnNodeIndices, targetNodeIndices, decommNodeInfo);
+      lbs.add(lsb);
+    }
+    return lbs;
+  }
+
+  private LocatedStripedBlock createEachLocatedBlock(int numDataBlk,
+      int numParityBlk, List<Integer> decommnNodeIndices,
+      List<Integer> targetNodeIndices, ArrayList<String> decommNodeInfo) {
+    final long blockGroupID = Long.MIN_VALUE;
+    int totalDns = numDataBlk + numParityBlk + targetNodeIndices.size();
+    DatanodeInfo[] locs = new DatanodeInfo[totalDns];
+    String[] storageIDs = new String[totalDns];
+    StorageType[] storageTypes = new StorageType[totalDns];
+    byte[] blkIndices = new byte[totalDns];
+
+    // Adding data blocks
+    int index = 0;
+    for (; index < numDataBlk; index++) {
+      blkIndices[index] = (byte) index;
+      // Location port always equal to logical index of a block,
+      // for easier verification
+      locs[index] = DFSTestUtil.getLocalDatanodeInfo(blkIndices[index]);
+      locs[index].setLastUpdateMonotonic(Time.monotonicNow());
+      storageIDs[index] = locs[index].getDatanodeUuid();
+      storageTypes[index] = StorageType.DISK;
+      // set decommissioned state
+      if (decommnNodeIndices.contains(index)) {
+        locs[index].setDecommissioned();
+        decommNodeInfo.add(locs[index].toString());
+        // Removing it from the list to ensure that all the given nodes are
+        // successfully marked as decomissioned.
+        decommnNodeIndices.remove(new Integer(index));
+      }
+    }
+    // Adding parity blocks after data blocks
+    index = NUM_DATA_BLOCKS;
+    for (int j = numDataBlk; j < numDataBlk + numParityBlk; j++, index++) {
+      blkIndices[j] = (byte) index;
+      // Location port always equal to logical index of a block,
+      // for easier verification
+      locs[j] = DFSTestUtil.getLocalDatanodeInfo(blkIndices[j]);
+      locs[j].setLastUpdateMonotonic(Time.monotonicNow());
+      storageIDs[j] = locs[j].getDatanodeUuid();
+      storageTypes[j] = StorageType.DISK;
+      // set decommissioned state
+      if (decommnNodeIndices.contains(index)) {
+        locs[j].setDecommissioned();
+        decommNodeInfo.add(locs[j].toString());
+        // Removing it from the list to ensure that all the given nodes are
+        // successfully marked as decomissioned.
+        decommnNodeIndices.remove(new Integer(index));
+      }
+    }
+    // Add extra target nodes to storage list after the parity blocks
+    int basePortValue = NUM_DATA_BLOCKS + NUM_PARITY_BLOCKS;
+    index = numDataBlk + numParityBlk;
+    for (int i = 0; i < targetNodeIndices.size(); i++, index++) {
+      int blkIndexPos = targetNodeIndices.get(i);
+      blkIndices[index] = (byte) blkIndexPos;
+      // Location port always equal to logical index of a block,
+      // for easier verification
+      locs[index] = DFSTestUtil.getLocalDatanodeInfo(basePortValue++);
+      locs[index].setLastUpdateMonotonic(Time.monotonicNow());
+      storageIDs[index] = locs[index].getDatanodeUuid();
+      storageTypes[index] = StorageType.DISK;
+      // set decommissioned state. This can happen, the target node is again
+      // decommissioned by administrator
+      if (decommnNodeIndices.contains(blkIndexPos)) {
+        locs[index].setDecommissioned();
+        decommNodeInfo.add(locs[index].toString());
+        // Removing it from the list to ensure that all the given nodes are
+        // successfully marked as decomissioned.
+        decommnNodeIndices.remove(new Integer(blkIndexPos));
+      }
+    }
+    return new LocatedStripedBlock(
+        new ExtendedBlock("pool", blockGroupID,
+            StripedFileTestUtil.BLOCK_STRIPED_CELL_SIZE, 1001),
+        locs, storageIDs, storageTypes, blkIndices, 0, false, null);
+  }
+
+  private static DatanodeManager mockDatanodeManager() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(
+        DFSConfigKeys.DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY, true);
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
+        STALE_INTERVAL);
+    FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
+    BlockManager bm = Mockito.mock(BlockManager.class);
+    BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
+    Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
+    DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
+    return dm;
+  }
+
+  private void prepareBlockIndexAndTokenList(List<LocatedBlock> lbs,
+      List<HashMap<DatanodeInfo, Byte>> locToIndexList,
+      List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList) 
{
+    for (LocatedBlock lb : lbs) {
+      HashMap<DatanodeInfo, Byte> locToIndex = new HashMap<DatanodeInfo, 
Byte>();
+      locToIndexList.add(locToIndex);
+
+      HashMap<DatanodeInfo, Token<BlockTokenIdentifier>> locToToken =
+          new HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>();
+      locToTokenList.add(locToToken);
+
+      DatanodeInfo[] di = lb.getLocations();
+      LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb;
+      for (int i = 0; i < di.length; i++) {
+        locToIndex.put(di[i], stripedBlk.getBlockIndices()[i]);
+        locToToken.put(di[i], stripedBlk.getBlockTokens()[i]);
+      }
+    }
+  }
+
+  /**
+   * Verify block index and token values. Must update block indices and block
+   * tokens after sorting.
+   */
+  private void assertBlockIndexAndTokenPosition(List<LocatedBlock> lbs,
+      List<HashMap<DatanodeInfo, Byte>> locToIndexList,
+      List<HashMap<DatanodeInfo, Token<BlockTokenIdentifier>>> locToTokenList) 
{
+    for (int i = 0; i < lbs.size(); i++) {
+      LocatedBlock lb = lbs.get(i);
+      LocatedStripedBlock stripedBlk = (LocatedStripedBlock) lb;
+      HashMap<DatanodeInfo, Byte> locToIndex = locToIndexList.get(i);
+      HashMap<DatanodeInfo, Token<BlockTokenIdentifier>> locToToken =
+          locToTokenList.get(i);
+      DatanodeInfo[] di = lb.getLocations();
+      for (int j = 0; j < di.length; j++) {
+        Assert.assertEquals("Block index value mismatches after sorting",
+            (byte) locToIndex.get(di[j]), stripedBlk.getBlockIndices()[j]);
+        Assert.assertEquals("Block token value mismatches after sorting",
+            locToToken.get(di[j]), stripedBlk.getBlockTokens()[j]);
+      }
+    }
+  }
+}
\ No newline at end of file

Reply via email to