Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 8ad9efbe1 -> 139327d34


HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned 
while dead. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/139327d3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/139327d3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/139327d3

Branch: refs/heads/branch-2.7
Commit: 139327d34b49eddda17b2efda3aeda1030170b92
Parents: 8ad9efb
Author: Kihwal Lee <kih...@apache.org>
Authored: Mon May 1 14:31:59 2017 -0500
Committer: Kihwal Lee <kih...@apache.org>
Committed: Mon May 1 14:31:59 2017 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |  21 +++-
 .../blockmanagement/UnderReplicatedBlocks.java  |   6 +-
 .../namenode/TestDecommissioningStatus.java     | 111 ++++++++++++++++++-
 4 files changed, 133 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/139327d3/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 5159459..6a74719 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -253,6 +253,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-11709. StandbyCheckpointer should handle an non-existing 
legacyOivImageDir
     gracefully. (Erik Krogen via zhz)
 
+    HDFS-11609. Some blocks can be permanently lost if nodes are decommissioned
+    while dead. (kihwal)
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/139327d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 85f9201..72a4295 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1630,7 +1630,8 @@ public class BlockManager {
    *
    * We prefer nodes that are in DECOMMISSION_INPROGRESS state to other nodes
    * since the former do not have write traffic and hence are less busy.
-   * We do not use already decommissioned nodes as a source.
+   * We do not use already decommissioned nodes as a source, unless there no
+   * other choice.
    * Otherwise we choose a random node among those that did not reach their
    * replication limits.  However, if the replication is of the highest 
priority
    * and all nodes have reached their replication limits, we will choose a
@@ -1662,6 +1663,7 @@ public class BlockManager {
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
     DatanodeDescriptor srcNode = null;
+    DatanodeDescriptor decommissionedSrc = null;
     int live = 0;
     int decommissioned = 0;
     int decommissioning = 0;
@@ -1704,9 +1706,16 @@ public class BlockManager {
       // the block must not be scheduled for removal on srcNode
       if(excessBlocks != null && excessBlocks.contains(block))
         continue;
-      // never use already decommissioned nodes
-      if(node.isDecommissioned())
+      // Save the live decommissioned replica in case we need it. Such replicas
+      // are normally not used for replication, but if nothing else is
+      // available, one can be selected as a source.
+      if (node.isDecommissioned()) {
+        if (decommissionedSrc == null ||
+            DFSUtil.getRandom().nextBoolean()) {
+          decommissionedSrc = node;
+        }
         continue;
+      }
 
       // We got this far, current node is a reasonable choice
       if (srcNode == null) {
@@ -1722,6 +1731,10 @@ public class BlockManager {
     if(numReplicas != null)
       numReplicas.initialize(live, decommissioned, decommissioning, corrupt,
           excess, 0);
+    // Pick a live decommissioned replica, if nothing else is available.
+    if (live == 0 && srcNode == null && decommissionedSrc != null) {
+      return decommissionedSrc;
+    }
     return srcNode;
   }
 
@@ -2549,7 +2562,7 @@ public class BlockManager {
 
     int curReplicaDelta;
     if (result == AddBlockResult.ADDED) {
-      curReplicaDelta = 1;
+      curReplicaDelta = (node.isDecommissioned()) ? 0 : 1;
       if (logEveryBlock) {
         logAddStoredBlock(storedBlock, node);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/139327d3/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 1cc42da..5275f44 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -305,9 +305,9 @@ class UnderReplicatedBlocks implements Iterable<Block> {
         " curPri  " + curPri +
         " oldPri  " + oldPri);
     }
-    if(oldPri != curPri) {
-      remove(block, oldPri);
-    }
+    // oldPri is mostly correct, but not always. If not found with oldPri,
+    // other levels will be searched until the block is found & removed.
+    remove(block, oldPri);
     if(priorityQueues.get(curPri).add(block)) {
       NameNode.blockStateChangeLog.debug(
           "BLOCK* NameSystem.UnderReplicationBlock.update: {} has only {} " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/139327d3/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
index 1c5d369..26a1cc9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
@@ -48,6 +48,7 @@ import 
org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@@ -75,6 +76,7 @@ public class TestDecommissioningStatus {
   private static FileSystem localFileSys;
   private static Configuration conf;
   private static Path dir;
+  private static Logger LOG;
 
   final ArrayList<String> decommissionedNodes = new 
ArrayList<String>(numDatanodes);
   
@@ -98,7 +100,7 @@ public class TestDecommissioningStatus {
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
         4);
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
     conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 1);
 
@@ -111,6 +113,7 @@ public class TestDecommissioningStatus {
     cluster.getNamesystem().getBlockManager().getDatanodeManager()
         .setHeartbeatExpireInterval(3000);
     Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
+    LOG = Logger.getLogger(TestDecommissioningStatus.class);
   }
 
   @AfterClass
@@ -431,4 +434,110 @@ public class TestDecommissioningStatus {
     writeConfigFile(localFileSys, excludeFile, null);
     dm.refreshNodes(conf);
   }
+
+  @Test(timeout=120000)
+  public void testDecommissionLosingData() throws Exception {
+    ArrayList<String> nodes = new ArrayList<String>(2);
+    FSNamesystem fsn = cluster.getNamesystem();
+    BlockManager bm = fsn.getBlockManager();
+    DatanodeManager dm = bm.getDatanodeManager();
+    Path file1 = new Path("decommissionLosingData.dat");
+    writeFile(fileSys, file1, (short)numDatanodes);
+    Thread.sleep(1000);
+
+    // Shutdown dn1
+    LOG.info("Shutdown dn1");
+    DatanodeID dnID = cluster.getDataNodes().get(1).getDatanodeId();
+    String dnName = dnID.getXferAddr();
+    DatanodeDescriptor dnDescriptor1 = dm.getDatanode(dnID);
+    nodes.add(dnName);
+    DataNodeProperties stoppedDN1 = cluster.stopDataNode(1);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Shutdown dn0
+    LOG.info("Shutdown dn0");
+    dnID = cluster.getDataNodes().get(0).getDatanodeId();
+    dnName = dnID.getXferAddr();
+    DatanodeDescriptor dnDescriptor0 = dm.getDatanode(dnID);
+    nodes.add(dnName);
+    DataNodeProperties stoppedDN0 = cluster.stopDataNode(0);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Decommission the nodes.
+    LOG.info("Decommissioning nodes");
+    writeConfigFile(localFileSys, excludeFile, nodes);
+    dm.refreshNodes(conf);
+    BlockManagerTestUtil.recheckDecommissionState(dm);
+    assertTrue(dnDescriptor0.isDecommissioned());
+    assertTrue(dnDescriptor1.isDecommissioned());
+
+    // All nodes are dead and decommed. Blocks should be missing.
+    long  missingBlocks = bm.getMissingBlocksCount();
+    long underreplicated = bm.getUnderReplicatedBlocksCount();
+    assertTrue(missingBlocks > 0);
+    assertTrue(underreplicated > 0);
+
+    // Bring back dn0
+    LOG.info("Bring back dn0");
+    cluster.restartDataNode(stoppedDN0, true);
+    do {
+      dnID = cluster.getDataNodes().get(0).getDatanodeId();
+    } while (dnID == null);
+    dnDescriptor0 = dm.getDatanode(dnID);
+    // Wait until it sends a block report.
+    while (dnDescriptor0.numBlocks() == 0) {
+      Thread.sleep(100);
+    }
+
+    // Bring back dn1
+    LOG.info("Bring back dn1");
+    cluster.restartDataNode(stoppedDN1, true);
+    do {
+      dnID = cluster.getDataNodes().get(1).getDatanodeId();
+    } while (dnID == null);
+    dnDescriptor1 = dm.getDatanode(dnID);
+    // Wait until it sends a block report.
+    while (dnDescriptor1.numBlocks() == 0) {
+      Thread.sleep(100);
+    }
+
+    // Blocks should be still be under-replicated
+    Thread.sleep(2000);  // Let replication monitor run
+    assertEquals(underreplicated, bm.getUnderReplicatedBlocksCount());
+
+    // Start up a node.
+    LOG.info("Starting two more nodes");
+    cluster.startDataNodes(conf, 2, true, null, null);
+    cluster.waitActive();
+    // Replication should fix it.
+    int count = 0;
+    while((bm.getUnderReplicatedBlocksCount() > 0 ||
+        bm.getPendingReplicationBlocksCount() > 0) &&
+        count++ < 10) {
+      Thread.sleep(1000);
+    }
+
+    assertEquals(0, bm.getUnderReplicatedBlocksCount());
+    assertEquals(0, bm.getPendingReplicationBlocksCount());
+    assertEquals(0, bm.getMissingBlocksCount());
+
+    // Shutdown the extra nodes.
+    dnID = cluster.getDataNodes().get(3).getDatanodeId();
+    cluster.stopDataNode(3);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    dnID = cluster.getDataNodes().get(2).getDatanodeId();
+    cluster.stopDataNode(2);
+    DFSTestUtil.waitForDatanodeState(cluster, dnID.getDatanodeUuid(),
+        false, 30000);
+
+    // Call refreshNodes on FSNamesystem with empty exclude file to remove the
+    // datanode from decommissioning list and make it available again.
+    writeConfigFile(localFileSys, excludeFile, null);
+    dm.refreshNodes(conf);
+    fileSys.delete(file1, false);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to