Repository: hadoop
Updated Branches:
  refs/heads/branch-2 5ca4f0f82 -> 6a6d0ece9


HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned 
while dead. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a6d0ece
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a6d0ece
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a6d0ece

Branch: refs/heads/branch-2
Commit: 6a6d0ece993d1c52fce15b91180ededcdbd2637c
Parents: 5ca4f0f
Author: Kihwal Lee <kih...@apache.org>
Authored: Mon May 1 14:20:37 2017 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Mon May 1 14:20:37 2017 -0500

----------------------------------------------------------------------
 .../server/blockmanagement/BlockManager.java    |  21 +++-
 .../blockmanagement/UnderReplicatedBlocks.java  |   6 +-
 .../namenode/TestDecommissioningStatus.java     | 111 ++++++++++++++++++-
 3 files changed, 130 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6d0ece/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 0e5cfc9..31437b8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1804,7 +1804,8 @@ public class BlockManager implements BlockStatsMXBean {
    *
    * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
    * since the former do not have write traffic and hence are less busy.
-   * We do not use already decommissioned nodes as a source.
+   * We do not use already decommissioned nodes as a source, unless there is
+   * no other choice.
    * Otherwise we choose a random node among those that did not reach their
    * replication limits.  However, if the replication is of the highest 
priority
    * and all nodes have reached their replication limits, we will choose a
@@ -1836,6 +1837,7 @@ public class BlockManager implements BlockStatsMXBean {
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
     DatanodeDescriptor srcNode = null;
+    DatanodeDescriptor decommissionedSrc = null;
     int live = 0;
     int readonly = 0;
     int decommissioned = 0;
@@ -1889,9 +1891,16 @@ public class BlockManager implements BlockStatsMXBean {
       // the block must not be scheduled for removal on srcNode
       if(excessBlocks != null && excessBlocks.contains(block))
         continue;
-      // never use already decommissioned nodes
-      if(node.isDecommissioned())
+      // Save the live decommissioned replica in case we need it. Such replicas
+      // are normally not used for replication, but if nothing else is
+      // available, one can be selected as a source.
+      if (node.isDecommissioned()) {
+        if (decommissionedSrc == null ||
+            ThreadLocalRandom.current().nextBoolean()) {
+          decommissionedSrc = node;
+        }
         continue;
+      }
       // Don't use dead ENTERING_MAINTENANCE or IN_MAINTENANCE nodes.
       if((!node.isAlive() && node.isEnteringMaintenance()) ||
           node.isInMaintenance()) {
@@ -1912,6 +1921,10 @@ public class BlockManager implements BlockStatsMXBean {
     if(numReplicas != null)
       numReplicas.set(live, readonly, decommissioned, decommissioning, corrupt,
           excess, 0, maintenanceNotForRead, maintenanceForRead);
+    // Pick a live decommissioned replica, if nothing else is available.
+    if (live == 0 && srcNode == null && decommissionedSrc != null) {
+      return decommissionedSrc;
+    }
     return srcNode;
   }
 
@@ -2795,7 +2808,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     int curReplicaDelta;
     if (result == AddBlockResult.ADDED) {
-      curReplicaDelta = 1;
+      curReplicaDelta = (node.isDecommissioned()) ? 0 : 1;
       if (logEveryBlock) {
         blockLog.debug("BLOCK* addStoredBlock: {} is added to {} (size={})",
             node, storedBlock, storedBlock.getNumBytes());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6d0ece/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index bc2fe7a..cd559a7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -310,9 +310,9 @@ class UnderReplicatedBlocks implements Iterable<BlockInfo> {
         " curPri  " + curPri +
         " oldPri  " + oldPri);
     }
-    if(oldPri != curPri) {
-      remove(block, oldPri);
-    }
+    // oldPri is mostly correct, but not always. If not found with oldPri,
+    // other levels will be searched until the block is found & removed.
+    remove(block, oldPri);
     if(priorityQueues.get(curPri).add(block)) {
       NameNode.blockStateChangeLog.debug(
           "BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a6d0ece/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index 0ca7412..5b61b0a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -48,6 +48,7 @@ import 
org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -74,6 +75,7 @@ public class TestDecommissioningStatus {
   private static FileSystem fileSys;
   private static HostsFileWriter hostsFileWriter;
   private static Configuration conf;
+  private Logger LOG;
 
   final ArrayList<String> decommissionedNodes = new 
ArrayList<String>(numDatanodes);
   
@@ -91,7 +93,7 @@ public class TestDecommissioningStatus {
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
         4);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
     conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
 
@@ -101,6 +103,7 @@ public class TestDecommissioningStatus {
     cluster.getNamesystem().getBlockManager().getDatanodeManager()
         .setHeartbeatExpireInterval(3000);
     Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
+    LOG = Logger.getLogger(TestDecommissioningStatus.class);
   }
 
   @After
@@ -386,4 +389,110 @@ public class TestDecommissioningStatus {
     hostsFileWriter.initExcludeHost("");
     dm.refreshNodes(conf);
   }
+
+  @Test(timeout=120000)
+  public void testDecommissionLosingData() throws Exception {
+    ArrayList<String> nodes = new ArrayList<String>(2);
+    FSNamesystem fsn = cluster.getNamesystem();
+    BlockManager bm = fsn.getBlockManager();
+    DatanodeManager dm = bm.getDatanodeManager();
+    Path file1 = new Path("decommissionLosingData.dat");
+    writeFile(fileSys, file1, (short)numDatanodes);
+    Thread.sleep(1000);
+
+    // Shutdown dn1
+    LOG.info("Shutdown dn1");
+    DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId();
+    String dnName = dnID.getXferAddr();
+    DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID);
+    nodes.add(dnName);
+    DataNodeProperties stoppedDN1 = cluster.stopDataNode(1);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Shutdown dn0
+    LOG.info("Shutdown dn0");
+    dnID = cluster.getDataNodes().get(0).getDatanodeId();
+    dnName = dnID.getXferAddr();
+    DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID);
+    nodes.add(dnName);
+    DataNodeProperties stoppedDN0 = cluster.stopDataNode(0);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Decommission the nodes.
+    LOG.info("Decommissioning nodes");
+    hostsFileWriter.initExcludeHosts(nodes);
+    dm.refreshNodes(conf);
+    BlockManagerTestUtil.recheckDecommissionState(dm);
+    assertTrue(dnDescriptor0.isDecommissioned());
+    assertTrue(dnDescriptor1.isDecommissioned());
+
+    // All nodes are dead and decommed. Blocks should be missing.
+    long  missingBlocks = bm.getMissingBlocksCount();
+    long underreplicated = bm.getUnderReplicatedBlocksCount();
+    assertTrue(missingBlocks > 0);
+    assertTrue(underreplicated > 0);
+
+    // Bring back dn0
+    LOG.info("Bring back dn0");
+    cluster.restartDataNode(stoppedDN0, true);
+    do {
+      dnID = cluster.getDataNodes().get(0).getDatanodeId();
+    } while (dnID == null);
+    dnDescriptor0 = dm.getDatanode(dnID);
+    // Wait until it sends a block report.
+    while (dnDescriptor0.numBlocks() == 0) {
+      Thread.sleep(100);
+    }
+
+    // Bring back dn1
+    LOG.info("Bring back dn1");
+    cluster.restartDataNode(stoppedDN1, true);
+    do {
+      dnID = cluster.getDataNodes().get(1).getDatanodeId();
+    } while (dnID == null);
+    dnDescriptor1 = dm.getDatanode(dnID);
+    // Wait until it sends a block report.
+    while (dnDescriptor1.numBlocks() == 0) {
+      Thread.sleep(100);
+    }
+
+    // Blocks should be still be under-replicated
+    Thread.sleep(2000);  // Let replication monitor run
+    assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount());
+
+    // Start up a node.
+    LOG.info("Starting two more nodes");
+    cluster.startDataNodes(conf, 2, true, null, null);
+    cluster.waitActive();
+    // Replication should fix it.
+    int count = 0;
+    while((bm.getUnderReplicatedBlocksCount() > 0 ||
+        bm.getPendingReplicationBlocksCount() > 0) &&
+        count++ < 10) {
+      Thread.sleep(1000);
+    }
+
+    assertEquals(0, bm.getUnderReplicatedBlocksCount());
+    assertEquals(0, bm.getPendingReplicationBlocksCount());
+    assertEquals(0, bm.getMissingBlocksCount());
+
+    // Shutdown the extra nodes.
+    dnID = cluster.getDataNodes().get(3).getDatanodeId();
+    cluster.stopDataNode(3);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    dnID = cluster.getDataNodes().get(2).getDatanodeId();
+    cluster.stopDataNode(2);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Call refreshNodes on FSNamesystem with empty exclude file to remove the
+    // datanode from decommissioning list and make it available again.
+    hostsFileWriter.initExcludeHost("");
+    dm.refreshNodes(conf);
+    fileSys.delete(file1, false);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to